专栏目录:*
Dubbo基础: https://blog.csdn.net/qq_38310244/article/details/125891802
Dubbo实战: https://blog.csdn.net/qq_38310244/article/details/125892120
手写一套简单的dubbo(含注册中心)之编程思想: https://blog.csdn.net/qq_38310244/article/details/125892641
手写一套简单的dubbo(含注册中心)之核心代码: https://blog.csdn.net/qq_38310244/article/details/125892849
既然有思路,疑问点都没问题了,就可以开始打码了
代码已上传到GitHub:https://github.com/simple-mine/rpc
创建一个干净的Maven项目,方便依赖的管理
首先,我们需要标记出哪些类需要进行代理,还需要声明项目是否使用我们开发的rpc框架。
EnableRPC
定义一个注解类EnableRPC,标识服务消费者是否启用RPC,并在注解类中定义referencePackage属性,标识哪些类需要生成代理,定义clientRegisterIp声明注册中心的地址
package com.simple.rpc.annotations;
import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
/**
* @Author: huangjun
* @Date: 2022/7/13 11:27
* @Version 1.0
*/
@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.TYPE)
public @interface EnableRPC {
//消费者需要注入远程接口的包名
String referencePackage() default "";
//注册中心ip
String clientRegisterIp() default "";
}
StratRPC
定义注解类StratRPC,标识服务提供者是否启用RPC,并在注解类中定义registerPackage属性,标识哪些类是接口的实现类(需要生成代理的类),serverPort属性标识服务监听的端口(服务启动后,需要一个端口监听来自服务消费者的请求),serverRegisterIp声明注册中心地址
package com.simple.rpc.annotations;
import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
/**
* @Author: huangjun
* @Date: 2022/7/12 10:01
* @Version 1.0
*/
@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.TYPE)
public @interface StratRPC {
//需要注册的服务所在的包
String registerPackage() default "";
//服务监听的端口
int serverPort() default 0;
//注册中心ip
String serverRegisterIp() default "";
}
RegisterService
定义RegisterService注解类,用于标识服务提供者哪些类是真真切切需要生成代理对象的,虽然前面已经需要生成代理类的包,但是不一定该包路径下的类都需要生成代理对象,这是对需要生成代理对象的类更加细化
package com.simple.rpc.annotations;
import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
/**
* @Author: huangjun
* @Date: 2022/7/8 10:12
* @Version 1.0
*/
@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.TYPE)
public @interface RegisterService {
String name() default "";
}
Reference
定义注解类Reference,用于标识服务消费者哪些类里需要引用到远程服务提供者提供的服务,在Springboot启动时,为这些成员变量注入代理对象
package com.simple.rpc.annotations;
import java.lang.annotation.*;
/**
* @Author: huangjun
* @Date: 2022/7/8 10:09
* @Version 1.0
*/
@Documented
@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.FIELD)
public @interface Reference {
String name() default "";
}
AbstractInterfaceFactory
创建服务消费者创建代理对象工厂抽象类AbstractInterfaceFactory,该类用于创建服务消费者需要生成代理对象的成员变量。该类定义了一个调用远程服务的抽象方法,并定义内部类默认实现该抽象方法的具体实现方式。
package com.simple.rpc.factory;
/**
* @Author: huangjun
* @Date: 2022/7/11 12:00
* @Version 1.0
*/
import com.simple.rpc.socket.SocketRequest;
import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;
import java.lang.reflect.Proxy;
import java.util.logging.Logger;
/**
* 代理工厂
*/
public abstract class AbstractInterfaceFactory {
protected static final Logger logger = Logger.getLogger(AbstractInterfaceFactory.class.getName());
public static AbstractInterfaceFactory getInstance() {
return new DefaultInterfaceFactory();
}
protected AbstractInterfaceFactory(){}
public <T> T getWebService(Class<T> oldInterface) {
InterfaceHandler intr = new InterfaceHandler(this);
return (T) Proxy.newProxyInstance(getClass().getClassLoader(), new Class[]{oldInterface}, new InterfaceHandler(this));
}
/**
* 子类实现
*/
protected abstract Object remoteCall(Method methodName, Object[] args);
/**
* 代理类
*/
private static final class InterfaceHandler implements InvocationHandler {
private AbstractInterfaceFactory factory;
public InterfaceHandler(AbstractInterfaceFactory factory) {
this.factory = factory;
}
@Override
public Object invoke(Object proxy, Method method, Object[] args) {
String remoteMethodName = method.getName();
logger.info("开始调用接口:" + remoteMethodName);
Object rst = factory.remoteCall(method, args);
logger.info("完成调用");
return rst;
}
}
/**
* 静态工厂的默认实现
*/
private static final class DefaultInterfaceFactory extends AbstractInterfaceFactory {
@Override
protected Object remoteCall(Method method, Object[] args) {
logger.info("远程方法调用中.....");
return SocketRequest.sendRequest(method,args);
}
}
}
ProxyConfig
创建ProxyConfig类,用于扫描服务消费者哪些类需要注入或实例化代理对象
package com.simple.rpc.config;
import com.simple.rpc.annotations.Reference;
import com.simple.rpc.factory.AbstractInterfaceFactory;
import org.reflections.Reflections;
import org.reflections.scanners.FieldAnnotationsScanner;
import org.reflections.util.ClasspathHelper;
import org.reflections.util.ConfigurationBuilder;
import org.springframework.context.ApplicationContext;
import org.springframework.stereotype.Component;
import java.lang.reflect.Field;
import java.util.Set;
/**
* @Author: huangjun
* @Date: 2022/7/11 18:41
* @Version 1.0
*/
@Component
public class ProxyConfig{
public static void initReference(ApplicationContext applicationContext,String referencePackage){
//反射工具包,指明扫描路径
Reflections reflections = new Reflections(new ConfigurationBuilder()
.setUrls(ClasspathHelper.forPackage(referencePackage))
.setScanners(new FieldAnnotationsScanner()));
Set<Field> fields = reflections.getFieldsAnnotatedWith(Reference.class);
for (Field field : fields) {
//获取需要远程调用的属性字段
Reference reference = field.getAnnotation(Reference.class);
if (reference != null) {
try {
//获取代理对象
Object webService = AbstractInterfaceFactory.getInstance().getWebService(field.getType());
//允许字段赋值
field.setAccessible(true);
Object bean;
if (applicationContext == null){
//非容器化启动时
bean = field.getDeclaringClass().newInstance();
}else {
//获取容器里该字段的类
bean = applicationContext.getBean(field.getDeclaringClass());
}
//对字段进行赋值
field.set(bean, webService);
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
}
ServerInterfaceFactory
创建ServerInterfaceFactory类,用于为服务提供者创建代理对象
package com.simple.rpc.factory;
import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;
import java.lang.reflect.Proxy;
/**
* @Author: huangjun
* @Date: 2022/7/8 16:53
* @Version 1.0
*/
public class ServerInterfaceFactory implements InvocationHandler {
/**
* 1. 目标类
*/
private final Object target;
public ServerInterfaceFactory(Object target) {
this.target = target;
}
/**
* 2. 代理逻辑
*/
@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
//调用目标方法
Object result = null;
try {
//前置通知
result = method.invoke(target, args);
} catch (Exception e) {
e.printStackTrace();
//异常通知, 可以访问到方法出现的异常
System.out.println( "方法调用出现了异常");
}
//后置通知.
System.out.println( "方法调用完成!");
return result;
}
/**
* 3. 获取目标类代理
*/
public Object getProxy() {
return Proxy.newProxyInstance(Thread.currentThread().getContextClassLoader(),target.getClass().getInterfaces(),this);
}
}
SocketRequest
创建SocketRequest类,用于实现服务消费者调用远程服务提供者的具体实现(通过Socket方式进行调用)
package com.simple.rpc.socket;
import com.simple.rpc.main.RPCMain;
import com.simple.util.entiy.TCPConfig;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.lang.reflect.Method;
import java.net.Socket;
/**
* @Author: huangjun
* @Date: 2022/7/12 18:01
* @Version 1.0
*/
public class SocketRequest {
public static Object sendRequest(Method method, Object[] args){
// 获取到远程类
String methodName = method.getName();
String serverName = method.getDeclaringClass().getTypeName() + "."+methodName;
TCPConfig tcpConfig = RPCMain.getTcpMap().get(serverName);
if (tcpConfig == null){
throw new RuntimeException("can not find provider");
}
return sendAndReceive(tcpConfig.getIp(),tcpConfig.getPort(),serverName,methodName,args);
}
private static Object sendAndReceive(String ip, int port,String serverName,String methodName, Object[] args){
//开启一个链接,需要指定地址和端口
try (Socket client = new Socket(ip, port)){
//向输出流中写入数据,传向服务端
ObjectOutputStream objectOutputStream = new ObjectOutputStream(client.getOutputStream());
objectOutputStream.writeObject(serverName);
objectOutputStream.writeObject(methodName);
objectOutputStream.writeObject(args);
//从输入流中解析数据,输入流来自服务端的响应
ObjectInputStream objectInputStream = new ObjectInputStream(client.getInputStream());
return objectInputStream.readObject();
}catch (Exception e){
e.printStackTrace();
throw new RuntimeException("invoke error");
}
}
}
Execute
创建Execute类,该类创建一个线程监听对外暴露的端口,实现处理远程消费者调用方法时,通过代理对象找到服务提供者对应的实现方法,并执行后返回给服务消费者
package com.simple.rpc.main;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.simple.rpc.factory.ServerInterfaceFactory;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.io.OutputStream;
import java.lang.reflect.Method;
import java.net.InetAddress;
import java.net.ServerSocket;
import java.net.Socket;
import java.net.UnknownHostException;
import java.util.Map;
import java.util.concurrent.LinkedTransferQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
/**
* @Author: huangjun
* @Date: 2022/7/13 10:10
* @Version 1.0
*/
public class Execute {
private static ThreadFactory namedFactory = new ThreadFactoryBuilder().setNameFormat("provid-thread-%d").build();
private static ThreadPoolExecutor threadPool = new ThreadPoolExecutor(2, 5, 100, TimeUnit.SECONDS, new
LinkedTransferQueue<>(), namedFactory, new ThreadPoolExecutor.AbortPolicy());
public static void startTpcServer(String ip, int serverPort){
threadPool.execute(()->{
executeRequest(ip,serverPort);
});
}
private static void executeRequest(String ip, int serverPort){
try {
//封装服务端地址
InetAddress serverAddress = InetAddress.getByName(ip);
//建立服务端
try (ServerSocket service = new ServerSocket(serverPort, 10, serverAddress)) {
while (true) {
//接受一个连接,该方法会阻塞程序,直到一个链接到来
try (Socket connect = service.accept()) {
//解析输入流,该输入流来自客户端
ObjectInputStream objectInputStream = new ObjectInputStream(connect.getInputStream());
String serverName = (String) objectInputStream.readObject();
String methodName = (String) objectInputStream.readObject();
Object[] args = (Object[]) objectInputStream.readObject();
if (serverName != null && methodName != null && args != null){
Map<String, Class<?>> serviceMap = RPCMain.getServiceMap();
Class<?> aClass1 = serviceMap.get(serverName);
Object invoke = "";
//查找对应的方法
Method[] methods = aClass1.getMethods();
for (Method method : methods) {
//调用方法
if (methodName.equals(method.getName())){
//创建代理对象
invoke = new ServerInterfaceFactory(aClass1.newInstance()).invoke(aClass1, method, args);
}
}
//组建响应信息
OutputStream out = connect.getOutputStream();
ObjectOutputStream objectOutputStream = new ObjectOutputStream(out);
objectOutputStream.writeObject(invoke);
}else {
//获取输入流,并通过向输出流写数据的方式发送响应
OutputStream out = connect.getOutputStream();
ObjectOutputStream objectOutputStream = new ObjectOutputStream(out);
objectOutputStream.writeObject("can not find provid");
}
} catch (Throwable e) {
e.printStackTrace();
}
}
} catch (Exception e) {
e.printStackTrace();
}
} catch (UnknownHostException e) {
e.printStackTrace();
}
}
}
ClientServer
创建ClientServer类,该类使用netty,与服务端建立连接(此处默认使用9999端口,也可以通过注解配置的方式),并在启动时将自身对外提供的服务(不管有没有)发送给服务端。
package com.simple.rpc.register;
import com.simple.util.entiy.TCPConfig;
import com.simple.util.serializer.DecoderSerializer;
import com.simple.util.serializer.EncoderSerializer;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;
import java.util.Map;
/**
* @Author: huangjun
* @Date: 2022/7/11 15:34
* @Version 1.0
*/
public class ClientServer {
private final static Integer NETTYPORT = 9999;
public static void run(String ip,Map<String, TCPConfig> tcpConfigMap) {
NioEventLoopGroup eventLoopGroup = new NioEventLoopGroup();
try {
//创建bootstrap对象配置参数
Bootstrap bootstrap = new Bootstrap();
//设置线程组
bootstrap.group(eventLoopGroup)
//设置客户端通道实现类型
.channel(NioSocketChannel.class)
//初始化通道
.handler(new ChannelInitializer<Channel>() {
@Override
protected void initChannel(Channel ch) throws Exception {
ch.pipeline().addLast(new EncoderSerializer());
ch.pipeline().addLast(new DecoderSerializer());
ch.pipeline().addLast(new ClientServerHandler());
}
});
System.out.println("客户端准备完毕");
//连接服务器
Channel channel = bootstrap.connect(ip, NETTYPORT).sync().channel();
channel.writeAndFlush(tcpConfigMap);
channel.flush();
channel.closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
//关闭线程组
eventLoopGroup.shutdownGracefully();
}
}
}
ClientServerHandler
创建ClientServerHandler类,用于接收netty服务端返回的消息。当启动服务服务之后,服务端会返回服务端所保有的所有服务列表,本地将服务列表保存到内存,在需要时直接查询对应的服务进行调用。
package com.simple.rpc.register;
import com.simple.rpc.main.RPCMain;
import com.simple.util.entiy.TCPConfig;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import java.util.Map;
/**
* @Author: huangjun
* @Date: 2022/7/11 15:35
* @Version 1.0
*/
@ChannelHandler.Sharable
public class ClientServerHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
Map map = (Map) msg;
Map<String, TCPConfig> tcpMap = RPCMain.getTcpMap();
//向本地的tcpMap中添加注册中心返回的其他服务信息
for (Object key : map.keySet()) {
tcpMap.put(key.toString(), (TCPConfig) map.get(key));
}
RPCMain.setTcpMap(tcpMap);
}
}
RPCMain
创建RPCMain类,该类是整个RPC启动时所执行的流程
package com.simple.rpc.main;
import com.simple.rpc.annotations.EnableRPC;
import com.simple.rpc.annotations.RegisterService;
import com.simple.rpc.annotations.StratRPC;
import com.simple.rpc.config.ProxyConfig;
import com.simple.rpc.register.ClientServer;
import com.simple.util.entiy.TCPConfig;
import org.reflections.Reflections;
import org.reflections.scanners.FieldAnnotationsScanner;
import org.reflections.scanners.SubTypesScanner;
import org.reflections.scanners.TypeAnnotationsScanner;
import org.reflections.util.ClasspathHelper;
import org.reflections.util.ConfigurationBuilder;
import org.springframework.beans.BeansException;
import org.springframework.boot.CommandLineRunner;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.core.annotation.Order;
import org.springframework.stereotype.Component;
import java.lang.reflect.Method;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import java.util.logging.Logger;
/**
* @Author: huangjun
* @Date: 2022/7/12 10:03
* @Version 1.0
*/
@Component
@Order(1)
public class RPCMain implements CommandLineRunner, ApplicationContextAware {
private static final Logger logger = Logger.getLogger(RPCMain.class.getName());
private static String registerPackage;
private static String hostAddress;
private static Integer serverPort;
private static String registerIp = null;
private static Map<String,Class<?>> serviceMap = new HashMap<>();
private static Map<String, TCPConfig> tcpMap = new HashMap<>();
private static ApplicationContext context;
private static String getMainClassName() {
//获取到启动类
StackTraceElement[] stackTraceElements = new RuntimeException().getStackTrace();
for (StackTraceElement stackTraceElement : stackTraceElements) {
if ("main".equals(stackTraceElement.getMethodName())) {
return stackTraceElement.getClassName();
}
}
return "";
}
private void initTcpServerClass(){
//只扫描指定的包路径
Reflections reflections = new Reflections(new ConfigurationBuilder()
.setUrls(ClasspathHelper.forPackage(registerPackage))
.setScanners(new FieldAnnotationsScanner(),new TypeAnnotationsScanner(),new SubTypesScanner(false)));
Set<Class<?>> serverClass = reflections.getTypesAnnotatedWith(RegisterService.class);
for (Class<?> aClass : serverClass) {
RegisterService service = aClass.getAnnotation(RegisterService.class);
if (service == null) {
continue;
}
Class<?>[] interfaces = aClass.getInterfaces();
for (Class<?> anInterface : interfaces) {
Method[] methods = anInterface.getMethods();
for (Method method : methods) {
String name = anInterface.getCanonicalName()+"."+ method.getName();
serviceMap.put(name, aClass);
TCPConfig tcpConfig = new TCPConfig(name,hostAddress,serverPort,Boolean.TRUE);
tcpMap.put(name,tcpConfig);
}
}
}
}
private void getHost() {
try{
InetAddress addr = InetAddress.getLocalHost();
hostAddress = addr.getHostAddress();
}catch (UnknownHostException e){
e.printStackTrace();
throw new RuntimeException(e.getMessage());
}
}
public static Map<String, TCPConfig> getTcpMap() {
return tcpMap;
}
public static void setTcpMap(Map<String, TCPConfig> tcpMap) {
RPCMain.tcpMap = tcpMap;
}
public static Map<String, Class<?>> getServiceMap() {
return serviceMap;
}
@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
context = applicationContext;
}
@Override
public void run(String... args) throws Exception {
String mainClassName = getMainClassName();
Class<?> aClass = null;
try {
aClass = ClassLoader.getSystemClassLoader().loadClass(mainClassName);
} catch (ClassNotFoundException e) {
e.printStackTrace();
}
if (aClass == null){
throw new RuntimeException("can not find main class");
}
EnableRPC enabletcp = aClass.getAnnotation(EnableRPC.class);
if (enabletcp != null){
String referencePackage = enabletcp.referencePackage();
if ("".equals(referencePackage.replaceAll("\\s*|\t|\r|\n", ""))){
throw new RuntimeException("@com.simple.tcp.annotations.EnableTCP.referencePackage can not be empty");
}
String clientRegisterIp = enabletcp.clientRegisterIp();
if ("".equals(clientRegisterIp.replaceAll("\\s*|\t|\r|\n", ""))){
throw new RuntimeException("@com.simple.tcp.annotations.StratTCP.clientRegisterIp can not be empty");
}
registerIp = clientRegisterIp;
//为service对象注入代理
ProxyConfig.initReference(context,referencePackage);
}
StratRPC annotation = aClass.getAnnotation(StratRPC.class);
//根据启动类上是否存在注解判断是否启动tcp
if (annotation != null){
registerPackage = annotation.registerPackage();
if ("".equals(registerPackage.replaceAll("\\s*|\t|\r|\n", ""))){
throw new RuntimeException("@com.simple.tcp.annotations.StratTCP.registerPackage can not be empty");
}
String serverRegisterIp = annotation.serverRegisterIp();
if ("".equals(serverRegisterIp.replaceAll("\\s*|\t|\r|\n", ""))){
throw new RuntimeException("@com.simple.tcp.annotations.StratTCP.serverRegisterIp can not be empty");
}
serverPort = annotation.serverPort();
if (serverPort == 0){
throw new RuntimeException("@com.simple.tcp.annotations.StratTCP.serverPort can not be empty");
}
if (serverPort == 9999){
throw new RuntimeException("Regiter center has use this port.Please use other port");
}
getHost();
//初始化服务类
initTcpServerClass();
//启动线程监听
Execute.startTpcServer(hostAddress,serverPort);
if (registerIp != null && serverRegisterIp.equals(registerIp)){
throw new RuntimeException("@com.simple.tcp.annotations.StratTCP.serverRegisterIp must equals @com.simple.tcp.annotations.EnableTCP.clientRegisterIp");
}
registerIp = serverRegisterIp;
}
// 启动netty,发布/订阅服务
ClientServer.run(registerIp,tcpMap);
}
}
我们知道,Springboot是不会将第三方jar包里的配置信息在启动时自动进行加载,要么在代码里显示的将第三方jar包的配置注入到容器,或者使用spring.factories文件。为了尽量达到无侵入,这里使用spring.factories
在resource文件夹下新建META-INF文件夹,在该文件夹下新建spring.factories指定核心启动类的包路径
org.springframework.boot.autoconfigure.EnableAutoConfiguration=com.simple.rpc.main.RPCMain
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-EjxzPkNW-1658301035643)(E:\study\dubbo\image\微信截图_20220719170921.png)]
由于netty传输数据时需要经过编码(这个是坑),这里创建三个处理netty传输服务列表信息的编码与解密类。netty客户端和netty服务端都必须用同一套编码工具类
DecoderSerializer
解码
package com.simple.util.serializer;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.ByteToMessageDecoder;
import java.util.List;
/** 解码
* @Author: huangjun
* @Date: 2022/7/12 17:41
* @Version 1.0
*/
public class DecoderSerializer extends ByteToMessageDecoder {
private static final int LENGTH = 4;
/**
* 解码时传进来的参数是ByteBuf也就是Netty的基本单位,而传出的是解码后的Object
*/
@Override
protected void decode(ChannelHandlerContext arg0, ByteBuf in, List<Object> out) throws Exception {
int size = in.readableBytes();
if (size > DecoderSerializer.LENGTH) {
byte[] bytes = JavaSerializer.getByteFromBuf(in);
Object info = JavaSerializer.decode(bytes);
out.add(info);
}
}
}
EncoderSerializer
编码
package com.simple.util.serializer;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToByteEncoder;
/**
* 编码
* @Author: huangjun
* @Date: 2022/7/12 17:42
* @Version 1.0
*/
public class EncoderSerializer extends MessageToByteEncoder<Object> {
/**
* 我们知道在计算机中消息的交互都是byte 但是在netty中进行了封装所以在netty中基本的传递类型是ByteBuf
*/
@Override
protected void encode(ChannelHandlerContext ctx, Object in, ByteBuf out) throws Exception {
byte[] bytes = JavaSerializer.encodes(in);
out.writeBytes(bytes);
}
}
JavaSerializer
序列化方式
package com.simple.util.serializer;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import java.io.*;
/**Java序列化工具
* @Author: huangjun
* @Date: 2022/7/12 17:23
* @Version 1.0
*/
public class JavaSerializer {
static byte[] encodes(Object obj) throws IOException {
ByteArrayOutputStream out = new ByteArrayOutputStream();
ObjectOutputStream oos = new ObjectOutputStream(out);
oos.writeObject(obj);
oos.flush();
oos.close();
byte[] bytes = out.toByteArray();
return bytes;
}
static Object decode(byte[] bytes) throws IOException, ClassNotFoundException {
// 对象返序列化
ByteArrayInputStream in = new ByteArrayInputStream(bytes);
ObjectInputStream inn = new ObjectInputStream(in);
Object obj = inn.readObject();
return obj;
}
/**
* byte to buf
*
* @param bytes
* @return
*/
public ByteBuf getBufFromByte(byte[] bytes) {
ByteBuf buf = Unpooled.copiedBuffer(bytes);
return buf;
}
/**
* buf to byte
*
* @param buf
* @return
*/
static byte[] getByteFromBuf(ByteBuf buf) {
int size = buf.readableBytes();
byte[] bytes = new byte[size];
buf.readBytes(bytes);
return bytes;
}
}
至此,基本上已经完成了简易的rpc框架,该RPC框架关键在于:
但还是缺少注册中心来实现处理服务列表的问题。
代码已上传到GitHub:https://github.com/simple-mine/rpc
创建一个独立的Springboot项目
RegisterServer
创建RegisterServer类,定义好服务端的netty信息
package com.simple.register.register;
import com.simple.util.serializer.DecoderSerializer;
import com.simple.util.serializer.EncoderSerializer;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
/**
* @Author: huangjun
* @Date: 2022/7/11 15:34
* @Version 1.0
*/
@Slf4j
@Component
public class RegisterServer {
public void run(int port) throws Exception {
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();
log.info("准备运行端口:{}" , port);
try {
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(bossGroup,workerGroup)
//设置服务端通信实现类型
.channel(NioServerSocketChannel.class)
//设置线程队列得到连接个数
.option(ChannelOption.SO_BACKLOG,128)
//设置保持活动连接状态
.childOption(ChannelOption.SO_KEEPALIVE,true)
//使用匿名内部类的形式初始化通道对象
.childHandler(new ChannelInitializer<Channel>() {
@Override
protected void initChannel(Channel ch) throws Exception {
ch.pipeline().addLast(new DecoderSerializer());
ch.pipeline().addLast(new EncoderSerializer());
ch.pipeline().addLast(new RegisterServerHandler());
}
});
//绑定端口,同步等待成功
ChannelFuture channelFuture = bootstrap.bind(port).sync();
log.info("注册中心已启动");
//等待服务监听端口关闭
channelFuture.channel().closeFuture().sync();
} finally {
//退出,释放线程资源
workerGroup.shutdownGracefully();
bossGroup.shutdownGracefully();
}
}
}
RegisterServerHandler
创建RegisterServerHandler类,监听netty客户端发来的消息。服务端netty发来的消息都是服务列表,需要对服务列表进行广播通知其他netty客户端。当某个netty客户端断开连接时,需要及时广播通知其他netty客户端新的服务列表
package com.simple.register.register;
import com.simple.util.entiy.TCPConfig;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.group.ChannelGroup;
import io.netty.channel.group.DefaultChannelGroup;
import io.netty.channel.socket.ChannelInputShutdownEvent;
import io.netty.channel.socket.ChannelInputShutdownReadComplete;
import io.netty.handler.timeout.IdleState;
import io.netty.handler.timeout.IdleStateEvent;
import io.netty.util.concurrent.GlobalEventExecutor;
import org.springframework.stereotype.Component;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import java.util.logging.Logger;
import java.util.stream.Collectors;
/**
* @Author: huangjun
* @Date: 2022/7/11 15:35
* @Version 1.0
*/
@Component
@ChannelHandler.Sharable
public class RegisterServerHandler extends ChannelInboundHandlerAdapter {
private static final Logger logger = Logger.getLogger(RegisterServerHandler.class.getName());
//接入的所有Channel
private static ChannelGroup channels = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
//保存所有的服务端信息,key值为ip地址
private static Map<String, Map<String, TCPConfig>> allServerMap = new HashMap<>();
public static Map<String, Map<String, TCPConfig>> getAllServerMap() {
return allServerMap;
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
//传入消息的Channel
Channel inComing = ctx.channel();
try {
if (msg != null) {
Map map = (Map) msg;
for (Channel channel : channels) {
if (channel != inComing) {
if ( map.size() > 0){
//为现有已注册的服务广播该服务
channel.writeAndFlush(map);
}
} else {
//为当前注册进来服务返回现有的服务信息
Map<String, TCPConfig> temp = new HashMap<>();
for (Map<String, TCPConfig> value : allServerMap.values()) {
temp.putAll(value);
}
channel.writeAndFlush(temp);
}
}
ctx.flush();
//保存服务端信息
allServerMap.put(inComing.remoteAddress().toString(), map);
}
} catch (Exception e) {
inComing.closeFuture().sync();
}
}
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
System.out.println(ctx.channel().remoteAddress().toString()+"退出注册");
channels.remove(ctx.channel());
allServerMap.remove(ctx.channel().remoteAddress().toString());
//广播通知更新后的服务信息
for (Channel channel : channels) {
channel.writeAndFlush(allServerMap.values());
}
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
channels.add(ctx.channel());
System.out.println(ctx.channel().remoteAddress() + "注册");
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
//异常处理
System.out.println("与客户端[" + ctx.channel().remoteAddress() + "]通信发生异常:");
cause.printStackTrace();
ctx.close();
}
/**
* 读取数据超时 ---> 断定连接断开 ----> 释放对应的socket连接
*/
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
try {
Channel channel = ctx.channel();
if (evt instanceof IdleStateEvent) {
IdleStateEvent e = (IdleStateEvent) evt;
if (e.state() == IdleState.READER_IDLE) {
channel.closeFuture().sync(); //call back channelInactive(ChannelHandlerContext ctx)
logger.info(channel.remoteAddress() + "---No data was received for a while ,read time out... ...");
}
else if (e.state() == IdleState.WRITER_IDLE) { // No data was sent for a while.
channel.closeFuture().sync();
logger.info(channel.remoteAddress() + "---No data was sent for a while.write time out... ...");
}
} else if (evt instanceof ChannelInputShutdownReadComplete) {
channel.closeFuture().sync();//远程主机强制关闭连接
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
在启动类上启动netty服务端
package com.simple.register;
import com.simple.register.register.RegisterServer;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import javax.annotation.Resource;
@SpringBootApplication
public class RegisterApplication implements CommandLineRunner {
@Resource
private RegisterServer registerServer;
public static void main(String[] args) {
SpringApplication.run(RegisterApplication.class, args);
}
@Override
public void run(String... args) throws Exception {
registerServer.run(9999);
}
}
注册中心关键的,是在于对于服务列表的管理,合理快速的分发服务列表给到每个客户端
上面已经完成了一个简单的rpc使用,现在演示一下实际使用(实际使用与dubbo十分相似)
创建一个Maven项目,该项目中定义公共接口

HelloService
package com.simple.api.service;
/**
* @Author: huangjun
* @Date: 2022/7/11 18:19
* @Version 1.0
*/
public interface HelloService {
String sayHello(String world);
}
创建一个Springboot项目,并引入简易RPC依赖和公共接口依赖

pom.xml
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0modelVersion>
<parent>
<groupId>org.springframework.bootgroupId>
<artifactId>spring-boot-starter-parentartifactId>
<version>2.7.1version>
<relativePath/>
parent>
<groupId>com.simplegroupId>
<artifactId>serverartifactId>
<version>0.0.1-SNAPSHOTversion>
<name>servername>
<description>Demo project for Spring Bootdescription>
<properties>
<java.version>1.8java.version>
properties>
<dependencies>
<dependency>
<groupId>org.springframework.bootgroupId>
<artifactId>spring-boot-starterartifactId>
dependency>
<dependency>
<groupId>org.projectlombokgroupId>
<artifactId>lombokartifactId>
<optional>trueoptional>
dependency>
<dependency>
<groupId>org.springframework.bootgroupId>
<artifactId>spring-boot-starter-testartifactId>
<scope>testscope>
dependency>
<dependency>
<groupId>com.simplegroupId>
<artifactId>rpcartifactId>
<version>1.0-SNAPSHOTversion>
dependency>
<dependency>
<groupId>com.simplegroupId>
<artifactId>apiartifactId>
<version>1.0-SNAPSHOTversion>
dependency>
dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.bootgroupId>
<artifactId>spring-boot-maven-pluginartifactId>
<configuration>
<excludes>
<exclude>
<groupId>org.projectlombokgroupId>
<artifactId>lombokartifactId>
exclude>
excludes>
configuration>
plugin>
plugins>
build>
project>
使用@RegisterService注解表明需要代理的接口实现类
HelloServiceImpl
package com.simple.server.service.impl;
import com.simple.api.service.HelloService;
import com.simple.rpc.annotations.RegisterService;
/**
* @Author: huangjun
* @Date: 2022/7/11 18:22
* @Version 1.0
*/
@RegisterService(name = "helloService")
public class HelloServiceImpl implements HelloService {
@Override
public String sayHello(String world) {
return "sayHello ==>" + world;
}
}
启动类上声明@StratRPC注解,并设置好参数
ServerApplication
package com.simple.server;
import com.simple.rpc.annotations.StratRPC;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
@StratRPC(registerPackage = "com.simple.server.service.impl",serverPort = 7777,serverRegisterIp = "127.0.0.1")
public class ServerApplication {
public static void main(String[] args) {
SpringApplication.run(ServerApplication.class, args);
}
}
创建一个springboot项目,并引入简易RPC依赖和公共接口

pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.7.1</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<groupId>com.simple</groupId>
<artifactId>client</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>client</name>
<description>Demo project for Spring Boot</description>
<properties>
<java.version>1.8</java.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<!-- 简易PRC -->
<dependency>
<groupId>com.simple</groupId>
<artifactId>rpc</artifactId>
<version>1.0-SNAPSHOT</version>
</dependency>
<!-- 公共接口 -->
<dependency>
<groupId>com.simple</groupId>
<artifactId>api</artifactId>
<version>1.0-SNAPSHOT</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.aspectj/aspectjweaver -->
<dependency>
<groupId>org.aspectj</groupId>
<artifactId>aspectjweaver</artifactId>
<version>1.9.5</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
<configuration>
<excludes>
<exclude>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</exclude>
</excludes>
</configuration>
</plugin>
</plugins>
</build>
</project>
HelloController
使用@Reference注解表明需要进行远程调用的接口(成员变量)
package com.simple.client.controller;
import com.simple.api.service.HelloService;
import com.simple.rpc.annotations.Reference;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
/**
* @Author: huangjun
* @Date: 2022/7/11 18:16
* @Version 1.0
*/
@RestController
@RequestMapping("/hello")
public class HelloController {
@Reference(name = "helloService")
private HelloService helloService;
@GetMapping("/say")
public String sayHello(String world){
return helloService.sayHello(world);
}
}
启动类上声明@EnableRPC注解,并设置好参数
ClientApplication
package com.simple.client;
import com.simple.rpc.annotations.EnableRPC;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@EnableRPC(referencePackage = "com.simple.client.controller",clientRegisterIp = "127.0.0.1")
@SpringBootApplication
public class ClientApplication {
public static void main(String[] args) {
SpringApplication.run(ClientApplication.class, args);
}
}
结果:

调用成功
理解了Dubbo的执行流程之后,手写一套RPC框架其实并不难。有了主要理论依据,剩下的就是如何获取代理对象,如何注入或者赋值代理对象,如何管理服务列表等等一些基础且核心的东西了