这篇文章主要介绍“Hadoop中RPC机制分析Server端”,在日常操作中,相信很多人在Hadoop中RPC机制分析Server端问题上存在疑惑,小编查阅了各式资料,整理出简单好用的操作方法,希望对大家解答”Hadoop中RPC机制分析Server端”的疑惑有所帮助!接下来,请跟着小编一起来学习吧!
1. Server.Listener
RPC Client 端的 RPC 请求发送到 Server 端后, 首先由 Server.Listener 接收
Server.Listener 类继承自 Thread 类, 监听了 OP_READ 和 OP_ACCEPT 事件
Server.Listener 接收 RPC 请求, 在 Server.Listener.doRead() 方法中读取数据, 在 doRead() 方法中又调用了Server.Connection.readAndProcess() 方法,
最后会调用 Server.Connection.processRpcRequest() 方法, 源码如下:
private void processRpcRequest(RpcRequestHeaderProto header,
DataInputStream dis) throws WrappedRpcServerException,
InterruptedException {
...
Writable rpcRequest;
// 从成员变量dis中反序列化出Client端发送来的RPC请求( WritableRpcEngine.Invocation对象 )
try { //Read the rpc request
rpcRequest = ReflectionUtils.newInstance(rpcRequestClass, conf);
rpcRequest.readFields(dis);
} catch (Throwable t) { // includes runtime exception from newInstance
...
}
// 构造Server端Server.Call实例对象
Call call = new Call(header.getCallId(), header.getRetryCount(),
rpcRequest, this, ProtoUtil.convert(header.getRpcKind()), header
.getClientId().toByteArray());
// 将Server.Call实例对象放入调用队列中
callQueue.put(call); // queue the call; maybe blocked here
incRpcCount(); // Increment the rpc count
}
调用队列 callQueue 是 Server 的成员变量, Server.Listener 和 Server.Handler 是典型的生产者, 消费者模型,
Server.Listener( 生产者 )的doRead()方法最终调用Server.Connection.processRpcRequest() 方法,
而Server.Handler( 消费者 )处理RPC请求
2. Server.Handler 继承 Thread 类, 其主要工作是处理 callQueue 中的调用, 都在 run() 方法中完成. 在 run() 的主循环中, 每次处理一个从 callQueue 中出队的请求, Server.call() 是一个抽象方法, 实际是调用了 RPC.Server.call()方法, 最后通过 WritableRPCEngine.call() 方法完成 Server 端方法调用
/** Handles queued calls . */
private class Handler extends Thread {
...
@Override
public void run() {
...
ByteArrayOutputStream buf =
new ByteArrayOutputStream(INITIAL_RESP_BUF_SIZE);
while (running) {
...
final Call call = callQueue.take(); // 获取一个RPC调用请求
...
Writable value = null;
value = call.connection.user.doAs(new PrivilegedExceptionAction<Writable>() {
@Override
public Writable run() throws Exception {
// 调用RPC.Server.call()方法
// call.rpcKind : RPC调用请求的类型, 一般为Writable
// call.connection.protocolName : RPC协议接口的类名
// call.rpcRequest : Invocation实例对象, 包括方法名, 参数列表, 参数列表的Class对象数组
// call.timestamp : 调用时间戳
return call(call.rpcKind, call.connection.protocolName,
call.rpcRequest, call.timestamp);
}
});
}
...
}
}
RPC.Server.call() 方法如下:
@Override
public Writable call(RPC.RpcKind rpcKind, String protocol,
Writable rpcRequest, long receiveTime) throws Exception {
return getRpcInvoker(rpcKind).call(this, protocol, rpcRequest,
receiveTime);
}
最后通过 WritableRPCEngine.call() 方法完成 Server 端方法调用, 代码如下:
@Override
public Writable call(org.apache.hadoop.ipc.RPC.Server server,
String protocolName, Writable rpcRequest, long receivedTime)
throws IOException, RPC.VersionMismatch {
Invocation call = (Invocation)rpcRequest; // 将RPC请求强制转成WritableRpcEngine.Invocation对象
...
long clientVersion = call.getProtocolVersion();
final String protoName;
ProtoClassProtoImpl protocolImpl; // Server端RPC协议接口的实现类的实例对象
...
// Invoke the protocol method
try {
...
// 获取RPC请求中调用的方法对象Method
Method method =
protocolImpl.protocolClass.getMethod(call.getMethodName(),
call.getParameterClasses());
method.setAccessible(true);
...
// 在Server端RPC协议接口的实现类的实例对象 protocolImpl 上调用具体的方法
Object value =
method.invoke(protocolImpl.protocolImpl, call.getParameters());
...
// 调用正常结束, 返回调用结果
return new ObjectWritable(method.getReturnType(), value);
} catch (InvocationTargetException e) { // 调用出现异常, 用IOException包装异常, 最后抛出该异常
Throwable target = e.getTargetException();
if (target instanceof IOException) {
throw (IOException)target;
} else {
IOException ioe = new IOException(target.toString());
ioe.setStackTrace(target.getStackTrace());
throw ioe;
}
} catch (Throwable e) {
...
}
}
}
在 WritableRpcEngine.call() 方法中, 传入的 rpcRequest 会被强制转换成 WritableRpcEngine.Invocation 类型的对象 call , 并通过 call 这个对象包含的方法名(getMethodName()方法)和参数列表的 Class对象数组(getParameterClasses())获取 Method 对象, 最终通过 Method 对象的invoke() 方法, 调用实现类的实例对象 protocolImpl 上的方法, 完成 Hadoop 的远程过程调用
好了, 现在 Server 端的具体方法已经被调用了, 调用结果分两种情况:
1) 调用正常结束, 则将方法的返回值和调用结果封装成一个 ObjectWritable 类型的对象, 并返回
2) 调用出现异常, 抛出 IOException 类型的异常
3. Server.Responder
这个类的功能: 发送 Hadoop 远程过程调用的应答给 Client 端, Server.Responder 类继承自 Thread 类, 监听了 OP_WRITE 事件, 即通道可写. 具体细节写不下去了
到此,关于“Hadoop中RPC机制分析Server端”的学习就结束了,希望能够解决大家的疑惑。理论与实践的搭配能更好的帮助大家学习,快去试试吧!若想继续学习更多相关知识,请继续关注天达云网站,小编会继续努力为大家带来更多实用的文章!