这篇文章将为大家详细讲解有关Java中RPC的原理是什么,文章内容质量较高,因此小编分享给大家做个参考,希望大家阅读完这篇文章后对相关知识有一定的了解。
package com.wish.RPC;
import java.io.IOException;
import java.io.InputStream;
import java.io.ObjectInput;
import java.io.ObjectInputStream;
import java.io.ObjectOutput;
import java.io.ObjectOutputStream;
import java.io.OutputStream;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicBoolean;
/**
* RPC原理解析:
* 服务器端:
* 1、RPCServer#registService:主要作用就是提供了一个服务注册管理中心,
* 用来保存被注册服务(如果是dubbo则是分布式服务框架,对应了不同机器的地址及端口发布的服务(dubbo还使用了zookeeper))
* 2、RPCServer#startServer:开启一个ServerSocket连接(new 一个ServiceTask服务,使用线程循环监听等待),
* 等待客户端的远程socket连接调用
* 3、RPCServer#registService:定义一个注册服务接口。即将所有需要注册的服务保存起来,后续ServiceTask需要使用该接口对象,
* 动态代理调用该接口对象方法,并将方法返回值通过socket网络通信方式,传递给该服务的Client客户端。
*
* 客户端:
* 1、RPCClient#findService:根据serviceInterface接口名,通过动态代理生成被请求对象及通过InvocationHandler调用远程方法。
* 其中InvocationHandler里面,通过传入的ip和prot地址,开启一个socket连接,远程发送调用远端RPCServer注册的服务方法
* 然后通过远端RPCServer,的socket连接,讲返回对象通过socket网络通信传递过来,这样即获取到了远端服务的返回结果。
*
* 启动服务端:
* 1、TestRPCServer#main:启动服务端,通过server.registService(new HelloWorld()) ;
* 注册HelloWorld服务方法到RPCServer
* 2、TestRPCServer#main:通过server.startServer(51234);启动RPCServer,监听来自client的socket请求
*
* 启动客户端:
* 1、TestRPCClient#main:通过RPCClient.findService("127.0.0.1" , 51234 , IHelloWorld.class);
* 调用客户端findService,获取HelloWorld对象,接下来即可以像使用本地一样使用远程服务方法
*
* PS:更多源码请访问:http://git.oschina.net/tantexian/wishRPC
*
* @author tantexian<tantexian@qq.com>
* @since 2016年5月27日 上午9:44:46
*/
public class RPCServer {
private static final ExecutorService taskPool = Executors.newFixedThreadPool(50);
/**
* 服务接口对象库 key:接口名 value:接口实现
*/
private static final ConcurrentHashMap<String, Object> serviceTargets = new ConcurrentHashMap<String, Object>();
private static AtomicBoolean run = new AtomicBoolean(false);
/**
* 注册服务
*
* @param service
*/
public void registService(Object service) {
Class<?>[] interfaces = service.getClass().getInterfaces();
if (interfaces == null) {
throw new IllegalArgumentException("服务对象必须实现接口");
}
Class<?> interfacez = interfaces[0];
String interfaceName = interfacez.getName();
serviceTargets.put(interfaceName, service);
}
/**
* 启动Server
*
* @param port
*/
public void startServer(final int port) {
Runnable lifeThread = new Runnable() {
@Override
public void run() {
ServerSocket lifeSocket = null;
Socket client = null;
ServiceTask serviceTask = null;
try {
lifeSocket = new ServerSocket(port);
run.set(true);
while (run.get()) {
client = lifeSocket.accept();
serviceTask = new ServiceTask(client);
serviceTask.accept();
}
} catch (IOException e) {
e.printStackTrace();
}
}
};
taskPool.execute(lifeThread);
System.out.println("服务启动成功...");
}
public void stopServer() {
run.set(false);
taskPool.shutdown();
}
public static final class ServiceTask implements Runnable {
private Socket client;
public ServiceTask(Socket client) {
this.client = client;
}
public void accept() {
taskPool.execute(this);
}
@Override
public void run() {
InputStream is = null;
ObjectInput oi = null;
OutputStream os = null;
ObjectOutput oo = null;
try {
is = client.getInputStream();
os = client.getOutputStream();
oi = new ObjectInputStream(is);
String serviceName = oi.readUTF();
String methodName = oi.readUTF();
Class<?>[] paramTypes = (Class[]) oi.readObject();
Object[] arguments = (Object[]) oi.readObject();
System.out.println("serviceName:" + serviceName + " methodName:" + methodName);
Object targetService = serviceTargets.get(serviceName);
if (targetService == null) {
throw new ClassNotFoundException(serviceName + "服务未找到!");
}
Method targetMethod = targetService.getClass().getMethod(methodName, paramTypes);
Object result = targetMethod.invoke(targetService, arguments);
oo = new ObjectOutputStream(os);
oo.writeObject(result);
} catch (IOException e) {
e.printStackTrace();
} catch (ClassNotFoundException e) {
e.printStackTrace();
} catch (SecurityException e) {
e.printStackTrace();
} catch (NoSuchMethodException e) {
e.printStackTrace();
} catch (IllegalArgumentException e) {
e.printStackTrace();
} catch (IllegalAccessException e) {
e.printStackTrace();
} catch (InvocationTargetException e) {
e.printStackTrace();
} finally {
try {
if (oo != null) {
oo.close();
}
if (os != null) {
os.close();
}
if (is != null) {
is.close();
}
if (oi != null) {
oi.close();
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
}
package com.wish.RPC;
import java.io.InputStream;
import java.io.ObjectInput;
import java.io.ObjectInputStream;
import java.io.ObjectOutput;
import java.io.ObjectOutputStream;
import java.io.OutputStream;
import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;
import java.lang.reflect.Proxy;
import java.net.Socket;
public class RPCClient {
/**
* 根据接口类型得到代理的接口实现
* @param <T>
* @param host RPC服务器IP
* @param port RPC服务端口
* @param serviceInterface 接口类型
* @return 被代理的接口实现
*/
@SuppressWarnings("unchecked")
public static <T> T findService(final String host , final int port ,final Class<T> serviceInterface){
return (T) Proxy.newProxyInstance(serviceInterface.getClassLoader(), new Class[]{serviceInterface}, new InvocationHandler() {
@SuppressWarnings("resource")
@Override
public Object invoke(final Object proxy, final Method method, final Object[] args)
throws Throwable {
Socket socket = null ;
InputStream is = null ;
OutputStream os = null ;
ObjectInput oi = null ;
ObjectOutput oo = null ;
try {
socket = new Socket(host, port) ;
os = socket.getOutputStream() ;
oo = new ObjectOutputStream(os);
oo.writeUTF(serviceInterface.getName()) ;
oo.writeUTF(method.getName()) ;
oo.writeObject(method.getParameterTypes()) ;
oo.writeObject(args);
is = socket.getInputStream() ;
oi = new ObjectInputStream(is) ;
return oi.readObject() ;
} catch (Exception e) {
System.out.println("调用服务异常...");
return null ;
}finally{
if(is != null){
is.close() ;
}
if(os != null){
is.close() ;
}
if(oi != null){
is.close() ;
}
if(oo != null){
is.close() ;
}
if(socket != null){
is.close() ;
}
}
}
});
}
}
package com.wish.RPC;
public class HelloWorld implements IHelloWorld {
@Override
public String sayHello(String name) {
return "Hello, " + name;
}
}
package com.wish.RPC;
public interface IHelloWorld {
String sayHello(String name);
}
package com.wish.RPC;
public class TestRPCServer {
public static void main(String[] args) {
RPCServer server = new RPCServer() ;
server.registService(new HelloWorld()) ;
server.startServer(51234) ;
}
}
package com.wish.RPC;
public class TestRPCClient {
public static void main(String[] args) {
IHelloWorld helloWorld =
RPCClient.findService("127.0.0.1" , 51234 , IHelloWorld.class) ;
String result = helloWorld.sayHello("tantexian, My blog address is: http://my.oschina.net/tantexian/");
System.out.println(result );
}
}
关于Java中RPC的原理是什么就分享到这里了,希望以上内容可以对大家有一定的帮助,可以学到更多知识。如果觉得文章不错,可以把它分享出去让更多的人看到。