Hadoop RPC机制+源码分析

 一、RPC基本原理


RPC是一种通过网络从远程计算机上请求服务的机制,封装了具体实现,使用户不需要了解底层网络技术。目前存在许多开源RPC框架,比较有名的有Thrift、Protocol Buffers和Avro。Hadoop RPC与他们一样,均由两部分组成:对象序列化和远程过程调用。
RPC采用客户机/服务器模型,在OSI网络通信模型中,RPC跨越了传输层和应用层,它使得开发分布式应用程序更加容易。一个典型的RPC框架主要包括以下几个部分:
1.通信模块:两个相互协作的通信模块实现请求应答协议,他们在客户机与服务器之间传递请求和应答消息,一般不会对数据包进行任何处理。
注意:请求应答协议有两种方式:同步与异步。同步指客户端程序一致阻塞到服务器端发送的应答请求到达本地。异步指客户端将请求发送到服务器端后,不必等待应答返回,可以继续做其他的事情,带服务器端处理完请求后,主动通知客户端。在高并发情景中,异步方式可以有效降低访问延迟和提高宽带利用率。

2.Stub程序:即代理程序,客户端与服务器端皆有。它是的远程函数调用表现的跟本地调用一样,对用户程序完全透明。在客户端,它将请求信息通过网络模块发送给服务器端,服务器响应后,会解码对应结果。在服务器端,他负责接码请求消息中的参数、调用相应的服务过程和编码应答结果的返回值。这个过程实际上是应用了代理模式。

3.调度程序:负责接收来自通信模块的请求权信息,并根据其中的标志选择一个Stub程序进行处理。当客户端并发请求量比较大时,采用线程池提高处理效率。
4.客户程序/服务过程:即请求的发出者与请求的处理者。
由上可知,一个RPC请求从发送到获取处理结果的过程如下:
1.客户程序以本地方式调用系统产生的Stub程序
2.Stub程序按照函数调用信息按照通信模块的要求封装成消息包,并交给通信模块发送到远程服务器端
3.远程服务器端接收此消息后,将消息发送给相应的Stub程序
4.Stub程序解码消息,形成被调过程要求的形式,并调用相应的函数
5.被调用函数按照所获参数执行,并将结果返回给Stub程序
6.Stub程序将返回结果封装成消息,通过网络通信模块逐级传送给客户程序

 - 二、Hadoop RPC使用


Hadoop RPC对外提供两种接口
1.public static VersionedProtocal getProxy/waitForProxy()
构造一个客户端对象(实现了某VersionedProtocal协议),用于向服务器端发送RPC请求。
2.public static Server getServer()
为某个协议实例构造一个服务器对象,用于处理客户端发送的请求
下面举例说明使用方法:
1.首先需要定义客户端与服务器之间的协议。先自定义一个RPC接口,继承VersionedProtocal接口


```
public interface ClientProtocol extends VersionedProtocol {
    public static final long versionID = 1L;

    public String echo(String value) throws IOException;

    /**
     * 定义两个数相加的协议
     * 
     * @param v1
     * @param v2
     * @return 两数相加的和
     * @throws IOException
     */
    public int add(int v1, int v2) throws IOException;

}
```



2.实现RPC协议,也就是自定义接口的具体实现


```
public class ClientProtocolImpl implements ClientProtocol {

    public long getProtocolVersion(String protocol, long clientVersion)
            throws IOException {
        return ClientProtocol.versionID;
    }

    public String echo(String value) throws IOException {
        return value;
    }

    public int add(int v1, int v2) throws IOException {
        return v1 + v2;
    }

}

```



3.构造一个RPC server并启动


```
public class ServerTest {
    public static void main(String[] args) throws IOException {
        Configuration conf = new Configuration();
        //这里使用本地主机为服务器,端口号为8088,使用3个handler
        Server server = (Server) RPC.getServer(new ClientProtocolImpl(),
                "127.0.0.1", 8088, 3, false, conf);
        server.start();
    }
}
```


4.构造RPC client,并发送RPC请求。这里使用静态方法getProxy()构造客户端代理对象,通过代理对象调用远程端的方法。


```
public class ClientTest {
    public static void main(String[] args) throws IOException {
        ClientProtocol proxy = (ClientProtocol) RPC.getProxy(
                ClientProtocol.class, ClientProtocol.versionID,
                new InetSocketAddress("127.0.0.1", 8088), new Configuration());
        //通过代理调用远程方法
        int result = proxy.add(4, 5);
        String echoResult = proxy.echo("result");
        System.out.println("result=" + result + "\nechoResult=" + echoResult);
    }
}
```



运行结果
服务器端:


```
15/04/27 11:19:38 INFO ipc.Server: Starting SocketReader
15/04/27 11:19:38 INFO ipc.Server: IPC Server Responder: starting
15/04/27 11:19:38 INFO ipc.Server: IPC Server listener on 8088: starting
15/04/27 11:19:38 INFO ipc.Server: IPC Server handler 0 on 8088: starting
15/04/27 11:19:38 INFO ipc.Server: IPC Server handler 1 on 8088: starting
15/04/27 11:19:38 INFO ipc.Server: IPC Server handler 2 on 8088: starting

```



显示启动了三个server handler
客户端:


```
result=9
echoResult=result

```



- 三、Hadoop RPC源码分析

Hadoop RPC主要由三大类组成,分别是RPC(对外编程接口)、Client(客户端实现)、Server(服务端实现)
1.org.apache.hadoop.ipc.RPC分析

![Hadoop RPC类图](http://img.blog.csdn.net/20150427165334866)
RPC类自定义了许多内部类,如上图的Server、ClientCache、Invoker等。
RPC.Server类给出了Server抽象类中call函数的具体实现,它根据请求中的调用方法名称和对应参数完成方法调用。



```
 public Writable call(Class<?> protocol, Writable param, long receivedTime) 
    throws IOException {
      try {
        Invocation call = (Invocation)param;
        if (verbose) log("Call: " + call);
        //根据远程调用信息,解析出方法名、参数列表等信息
        Method method =
          protocol.getMethod(call.getMethodName(),
                                   call.getParameterClasses());
        method.setAccessible(true);

        long startTime = System.currentTimeMillis();
        //根据method完成方法调用
        Object value = method.invoke(instance, call.getParameters());
        int processingTime = (int) (System.currentTimeMillis() - startTime);
        int qTime = (int) (startTime-receivedTime);
        if (LOG.isDebugEnabled()) {
          LOG.debug("Served: " + call.getMethodName() +
                    " queueTime= " + qTime +
                    " procesingTime= " + processingTime);
        }
        rpcMetrics.addRpcQueueTime(qTime);
        rpcMetrics.addRpcProcessingTime(processingTime);
        rpcMetrics.addRpcProcessingTime(call.getMethodName(), processingTime);
        if (verbose) log("Return: "+value);

        return new ObjectWritable(method.getReturnType(), value);

      } catch (InvocationTargetException e) {
        Throwable target = e.getTargetException();
        if (target instanceof IOException) {
          throw (IOException)target;
        } else {
          IOException ioe = new IOException(target.toString());
          ioe.setStackTrace(target.getStackTrace());
          throw ioe;
        }
      } catch (Throwable e) {
        if (!(e instanceof IOException)) {
          LOG.error("Unexpected throwable object ", e);
        }
        IOException ioe = new IOException(e.toString());
        ioe.setStackTrace(e.getStackTrace());
        throw ioe;
      }
    }
  }
```



RPC类还有一个包含ClientCache内部类,它根据用户提供的SocketFactory缓存Client,以达到重用Client对象的目的。


```
private synchronized Client getClient(Configuration conf,
        SocketFactory factory) {
      //client是一个键为SocketFactory,值为Client对象的hashmap,利用这个hash表完成缓存工作    
      Client client = clients.get(factory);
      if (client == null) {
        client = new Client(ObjectWritable.class, conf, factory);
        clients.put(factory, client);
      } else {
        client.incCount();
      }
      return client;
    }
```



Hadoop主要使用Java动态代理的思想实现对远程方法的调用,用户只需要实现InvocationHandler接口,并按需求实现invoke方法即可。在Hadoop中,由Invoker类完成



```
 public Object invoke(Object proxy, Method method, Object[] args)
      throws Throwable {
      final boolean logDebug = LOG.isDebugEnabled();
      long startTime = 0;
      if (logDebug) {
        startTime = System.currentTimeMillis();
      }
      //将参数、函数信息打包成Invocation对象,通过网络发送到服务器,服务器接收到调用信息后,解析出函数名和参数等信息,利用Java反射机制完成函数调用
      ObjectWritable value = (ObjectWritable)
        client.call(new Invocation(method, args), remoteId);
      if (logDebug) {
        long callTime = System.currentTimeMillis() - startTime;
        LOG.debug("Call: " + method.getName() + " " + callTime);
      }
      return value.get();
    }
```



2.org.apache.hadoop.ipc.Client分析
![Hadoop Client类图](http://img.blog.csdn.net/20150427173149766)
Client类的主要功能是发送远程过程调用信息并接收执行结果,主要提供两个接口:


```
public Writable call(Writable param, InetSocketAddress addr,
                       Class<?> protocol, UserGroupInformation ticket,
                       int rpcTimeout, Configuration conf)
public Writable[] call(Writable[] params, InetSocketAddress[] addresses,
      Class<?> protocol, UserGroupInformation ticket, Configuration conf)
```



这两个接口分别是执行单个远程调用和批量执行远程调用
Client有两个重要的内部类,分别是Call和Connection,Call封装了一系列请求参数,其中id用于唯一标识每一次的远程函数调用;Connection类是Client类与Server之间的通信连接,保存了基本的连接信息,包括remoteId、socket、输入输出流、RPC请求的hash(Calls)
当调用    Call函数执行某个远程方法时,Client端步骤如下:
a.创建一个Connection对象,并将远程方法调用信息封装成Call对象,存入Connection的hash表中;
b.启动C    onnection线程,当检查到有消息传送时,通过receiveResponse()接收结果。调用Connection类中的sendParam()方法将当前的Call对象发送给Server端;
c.Server端处理完RPC请求后,将结果通过网路返回给Client端,Client端waitForWork()为true,调用receiveResponse()接收结果;
d.Client端检查处理结果状态,并将其从Connection的hash表中删除。


```
 public Writable call(Writable param, ConnectionId remoteId)  
                       throws InterruptedException, IOException {
    Call call = new Call(param);
    Connection connection = getConnection(remoteId, call);
    connection.sendParam(call);                 // send the parameter
    boolean interrupted = false;
    synchronized (call) {
      while (!call.done) {
        try {
          call.wait();                           // wait for the result
        } catch (InterruptedException ie) {
          // save the fact that we were interrupted
          interrupted = true;
        }
      }

      if (interrupted) {
        // set the interrupt flag now that we are done waiting
        Thread.currentThread().interrupt();
      }

      if (call.error != null) {
        if (call.error instanceof RemoteException) {
          call.error.fillInStackTrace();
          throw call.error;
        } else { // local exception
          // use the connection because it will reflect an ip change, unlike
          // the remoteId
          throw wrapException(connection.getRemoteAddress(), call.error);
        }
      } else {
        return call.value;
      }
    }
  }

```



2.org.apache.hadoop.ipc.Server分析 
Server主要是接收来自客户端的请求,通过调用相应的函数获取结果后,返回给相应的客户端。Server主要被划分为三个阶段:接收请求、处理请求和返回结果
a.接受请求:接收来自客户端的RPC请求,并封装成Call对象,放到共享队列callQueue中,由Listener和Reader两种线程完成;
b.处理请求:从callQueue中获取Call对象,执行函数调用,这个工作交个Handler线程完成;
c.返回结果:Handler线程执行完函数调用后,将执行结果返回给客户端,如果返回结果过大或网络异常,会将发送任务交给Responder线程
实际上,Server端运用了Reactor模式,在此不赘述。

Para obtener información más completa sobre las optimizaciones del compilador, consulte nuestro Aviso de optimización.