• Java实现基于Socket的负载均衡代理服务器(含六种负载均衡算法)


    目录

    前言

    一、常见负载均衡算法

    1.完全轮询算法

    2.加权轮询算法

    3.完全随机算法

    4.加权随机算法

    5.余数Hash算法

    6.一致性Hash算法

    二、代码实现

    1.项目结构

    2.代码实现

    总结


    前言

    负载均衡建立在现有网络结构之上,它提供了一种廉价有效透明的方法扩展网络设备服务器的带宽、增加吞吐量、加强网络数据处理能力、提高网络的灵活性和可用性。

    负载均衡(Load Balance)其意思就是分摊到多个操作单元上进行执行,例如Web服务器FTP服务器企业关键应用服务器和其它关键任务服务器等,从而共同完成工作任务。

    解释来源:百度百科


    一、常见负载均衡算法

    1.完全轮询算法

    完全轮询就如字面意思一样,每个节点轮流处理请求,相当于一个指针在节点数组中循环遍历

    2.加权轮询算法

    加权轮询在完全轮询的基础上为每个服务器增加了权重,权重高的会处理更多的请求,这解决了不同节点配置不同的情况引发的分配不均问题,相当于节点数组中每个节点有对应权重数量的个数,指针循环遍历,可以理解为在线性区间内增加节点的线段长

    3.完全随机算法

    请求随机打到一个节点上进行处理,相当于在节点数组的长度内产生一个随机数,随机数所对应索引的节点为被选中节点

    4.加权随机算法

    在完全随机算法的基础上为节点配置权重,与加权轮询的原理类似,可认为调整节点在线性区间内的线段长度以增加被选中的概率

    5.余数Hash算法

    以上算法都不能解决一个缓存和session问题,因为同一个客户端的不同请求几乎每次都是不同的服务来处理的,这样客户的缓存和session等就会丢失,需要重构,所以可以对客户端ip进行Hash处理,之后对服务器数量取余,这样可以客户端的请求是同一个服务器来处理

    6.一致性Hash算法

    余数Hash虽然解决了客户端的请求到不同服务器的问题,但是当某个或某些服务器下线或上线的情况,几乎会为大多数客户端重新分配服务器,这样导致系统造成不稳定性,一致性Hash定义一个一致性Hash环,为节点计算出Hash值之后放入环内,当客户端请求时,为客户端计算Hash值,在一致性Hash环中沿一个特定的方向寻找离它最近的节点,这样即使遇到服务器上线会下线也不会对大多数服务器产生影响,当然我们可以为节点创建虚拟节点来更均匀合理的将他们分配到Hash环中

    二、代码实现

    1.项目结构

    ├── pom.xml
    └── src
        ├── main
        │   ├── java
        │   │   ├── Main.java   启动类,ServerSocket绑定监听端口接收请求
        │   │   └── system
        │   │       ├── common
        │   │       │   ├── ConnectUtil.java    连接工具类,可测试是否与指定IP端口连接成功
        │   │       │   └── GetHashCode.java   重计算Hash值工具类
        │   │       ├── configure
        │   │       │   └── Configuration.java   配置类,解析xml配置文件并封装为配置类
        │   │       ├── entity
        │   │       │   └── Server.java   服务器类,包含serverName,ip,port,wight属性
        │   │       ├── random
        │   │       │   ├── BalanceService.java   负载均衡接口,包含获取server,增加server,删除server方法
        │   │       │   └── imp
        │   │       │       ├── ConsistentHashServerImpl.java  一致性Hash负载均衡实现类
        │   │       │       ├── HashServerImpl.java   余数Hash负载均衡实现类
        │   │       │       ├── PollServerImpl.java  完全轮询负载均衡实现类
        │   │       │       ├── RandomServerImpl.java   完全随机负载均衡实现类
        │   │       │       ├── ServerMonitorImpl.java   服务监视器,装饰者模式,为其他实现类增加服务监控、动态增减服务器功能
        │   │       │       ├── WeightPollServerImpl.java   加权轮询负载均衡实现类
        │   │       │       └── WeightRandomServerImpl.java   加权随机负载均衡实现类
        │   │       └── socket
        │   │           └── SocketThread.java   客户端Socket请求线程,每个客户端请求对应一个线程对象,提交到线程池
        │   └── resources
        │       ├── log4j.properties   日志配置文件
        │       └── xw-load-balancing.xml   项目配置文件
        └── test
            └── java

    2.代码实现

    xml配置文件示例

    1. <configuration>
    2. <servers>
    3. <server name="sever1" address="127.0.0.1" port="8083" weight="1"/>
    4. <server name="sever2" address="127.0.0.1" port="8082" weight="2"/>
    5. <server name="sever3" address="127.0.0.1" port="8081" weight="2"/>
    6. <server name="sever4" address="127.0.0.1" port="8080" weight="1"/>
    7. servers>
    8. <settings>
    9. <setting name="vnnNodeCount" value="3"/>
    10. <setting name="random" value="ConsistentHash"/>
    11. <setting name="openServerMonitor" value="true"/>
    12. <setting name="port" value="8088"/>
    13. settings>
    14. configuration>

     Configuration.java

    利用Dom4j解析xml配置文件,将配置里面的内容解析,将Server节点封装成Server对象list存储,根据选择的负载均衡算法来注册对应的实现类,如果开启服务监控则使用装饰器加强

    1. package system.configure;
    2. import org.dom4j.Document;
    3. import org.dom4j.DocumentException;
    4. import org.dom4j.Element;
    5. import org.dom4j.io.SAXReader;
    6. import system.entity.Server;
    7. import system.random.BalanceService;
    8. import system.random.imp.*;
    9. import java.io.File;
    10. import java.util.ArrayList;
    11. import java.util.List;
    12. /**
    13. * 配置类
    14. *
    15. * @author xuwei
    16. * @date 2022/07/18 11:30
    17. **/
    18. public class Configuration {
    19. private volatile static Configuration configuration;
    20. private BalanceService balanceService;
    21. private Integer port;
    22. private Configuration(String fileName) {
    23. File file = new File(fileName);
    24. if (file.exists()) {
    25. SAXReader reader = new SAXReader();
    26. List serverList = new ArrayList<>();
    27. Document document = null;
    28. try {
    29. document = reader.read(file);
    30. } catch (DocumentException e) {
    31. e.printStackTrace();
    32. }
    33. Integer vnnNodeCount = 3;
    34. assert document != null;
    35. Element root = document.getRootElement();
    36. List childElements = root.elements();
    37. for (Element child : childElements) {
    38. if (!child.elements().isEmpty()) {
    39. for (Element c : child.elements()) {
    40. switch (child.getName()) {
    41. case "servers":
    42. serverList.add(new Server(c.attributeValue("name"), c.attributeValue("address"), Integer.valueOf("".equals(c.attributeValue("port")) ? "80" : c.attributeValue("port")), Integer.valueOf("".equals(c.attributeValue("weight")) ? "0" : c.attributeValue("weight"))));
    43. break;
    44. case "settings":
    45. switch (c.attributeValue("name")) {
    46. case "port":
    47. this.port = Integer.valueOf(c.attributeValue("value") == null ? "8088" : "".equals(c.attributeValue("value")) ? "8088" : c.attributeValue("value"));
    48. break;
    49. case "vnnNodeCount":
    50. vnnNodeCount = c.attributeValue("value") == null ? vnnNodeCount : "".equals(c.attributeValue("value")) ? vnnNodeCount : Integer.valueOf(c.attributeValue("value"));
    51. break;
    52. case "random":
    53. String random = c.attributeValue("value") == null ? "RandomServer" : "".equals(c.attributeValue("value")) ? "RandomServer" : c.attributeValue("value");
    54. switch (random) {
    55. case "WeightRandomServer":
    56. balanceService = new WeightRandomServerImpl(serverList);
    57. break;
    58. case "PollServer":
    59. balanceService = new PollServerImpl(serverList);
    60. break;
    61. case "WeightPollServer":
    62. balanceService = new WeightPollServerImpl(serverList);
    63. break;
    64. case "HashServer":
    65. balanceService = new HashServerImpl(serverList);
    66. break;
    67. case "ConsistentHash":
    68. balanceService = new ConsistentHashServerImpl(serverList, vnnNodeCount);
    69. break;
    70. case "RandomServer":
    71. default:
    72. balanceService = new RandomServerImpl(serverList);
    73. break;
    74. }
    75. case "openServerMonitor":
    76. if ("true".equals(c.attributeValue("value"))) {
    77. balanceService = new ServerMonitorImpl(balanceService);
    78. }
    79. break;
    80. default:
    81. break;
    82. }
    83. break;
    84. default:
    85. break;
    86. }
    87. }
    88. }
    89. }
    90. }
    91. }
    92. public static Configuration getConfiguration(String fileName) {
    93. if (configuration == null) {
    94. synchronized (Configuration.class) {
    95. if (configuration == null) {
    96. configuration = new Configuration(fileName);
    97. }
    98. }
    99. }
    100. return configuration;
    101. }
    102. public BalanceService getBalanceService() {
    103. return balanceService;
    104. }
    105. public Integer getPort() {
    106. return port;
    107. }
    108. }

    Server.java

    包含serverName、address、port、weight属性,存储节点信息

    1. package system.entity;
    2. /**
    3. * @ClassName Server
    4. * @Author xuwei
    5. * @DATE 2022/4/11
    6. */
    7. public class Server {
    8. private String serverName;
    9. private String address;
    10. private Integer port;
    11. private Integer weight;
    12. public Server() {
    13. }
    14. public Server(String serverName, String address, Integer port, Integer weight) {
    15. this.serverName = serverName;
    16. this.address = address;
    17. this.port = port;
    18. this.weight = weight;
    19. }
    20. public String getServerName() {
    21. return serverName;
    22. }
    23. public String getAddress() {
    24. return address;
    25. }
    26. public Integer getPort() {
    27. return port;
    28. }
    29. public Integer getWeight() {
    30. return weight;
    31. }
    32. @Override
    33. public String toString() {
    34. return "Server{" +
    35. "serverName='" + serverName + '\'' +
    36. ", address='" + address + '\'' +
    37. ", port=" + port +
    38. ", weight=" + weight +
    39. '}';
    40. }
    41. }

    SocketThread.java

    Socket处理线程,根据节点信息创建远程连接,将客户端数据与远程服务器数据相互转发,实现代理

    1. package system.socket;
    2. import org.apache.log4j.Logger;
    3. import java.io.IOException;
    4. import java.io.InputStream;
    5. import java.io.OutputStream;
    6. import java.net.InetSocketAddress;
    7. import java.net.Socket;
    8. /**
    9. * @ClassName SocketThread
    10. * @Author xuwei
    11. * @DATE 2022/4/12
    12. */
    13. public class SocketThread extends Thread {
    14. /**
    15. * 五分钟超时
    16. */
    17. public static final int SO_TIME_OUT = 300000;
    18. private static final int BUFFER_SIZE = 8092;
    19. private static final Logger log = Logger.getLogger(SocketThread.class);
    20. private final Socket localSocket;
    21. private final String remoteHost;
    22. private final Integer remotePort;
    23. private Socket remoteSocket;
    24. private InputStream remoteSocketInputStream;
    25. private OutputStream localSocketOutputStream;
    26. public SocketThread(Socket socket, String remoteHost, Integer remotePort) {
    27. this.localSocket = socket;
    28. this.remoteHost = remoteHost;
    29. this.remotePort = remotePort;
    30. }
    31. @Override
    32. public void run() {
    33. try {
    34. remoteSocket = new Socket();
    35. remoteSocket.connect(new InetSocketAddress(remoteHost, remotePort));
    36. //设置超时,超过时间未收到客户端请求,关闭资源
    37. //5分钟内无数据传输、关闭链接
    38. remoteSocket.setSoTimeout(SO_TIME_OUT);
    39. remoteSocketInputStream = remoteSocket.getInputStream();
    40. OutputStream remoteSocketOutputStream = remoteSocket.getOutputStream();
    41. InputStream localSocketInputStream = localSocket.getInputStream();
    42. localSocketOutputStream = localSocket.getOutputStream();
    43. new ReadThread().start();
    44. //写数据,负责读取客户端发送过来的数据,转发给远程
    45. dataTransmission(localSocketInputStream, remoteSocketOutputStream);
    46. } catch (Exception e) {
    47. log.warn(e);
    48. } finally {
    49. close();
    50. }
    51. }
    52. private void dataTransmission(InputStream inputStream, OutputStream outputStream) throws IOException {
    53. byte[] data = new byte[BUFFER_SIZE];
    54. int len;
    55. while ((len = inputStream.read(data)) > 0) {
    56. /*
    57. 读到了缓存大小一致的数据,不需要拷贝,直接使用
    58. 读到了比缓存大小的数据,需要拷贝到新数组然后再使用
    59. */
    60. if (len == BUFFER_SIZE) {
    61. outputStream.write(data);
    62. } else {
    63. byte[] dest = new byte[len];
    64. System.arraycopy(data, 0, dest, 0, len);
    65. outputStream.write(dest);
    66. }
    67. }
    68. }
    69. /**
    70. * 关闭资源
    71. */
    72. private void close() {
    73. try {
    74. if (remoteSocket != null && !remoteSocket.isClosed()) {
    75. remoteSocket.close();
    76. log.info("remoteSocket ---> " + remoteSocket.getRemoteSocketAddress().toString().replace("/", "") + " socket closed");
    77. }
    78. } catch (IOException e1) {
    79. e1.printStackTrace();
    80. }
    81. try {
    82. if (localSocket != null && !localSocket.isClosed()) {
    83. localSocket.close();
    84. log.info("localSocket ---> " + localSocket.getRemoteSocketAddress().toString().replace("/", "") + " socket closed");
    85. }
    86. } catch (IOException e1) {
    87. log.warn(e1);
    88. }
    89. }
    90. /**
    91. * 读数据线程负责读取远程数据后回写到客户端
    92. */
    93. class ReadThread extends Thread {
    94. @Override
    95. public void run() {
    96. try {
    97. dataTransmission(remoteSocketInputStream, localSocketOutputStream);
    98. } catch (IOException e) {
    99. log.warn(e);
    100. } finally {
    101. close();
    102. }
    103. }
    104. }
    105. }

     ConnectUtil.java

    测试服务是否可用

    1. package system.common;
    2. import org.apache.log4j.Logger;
    3. import java.io.IOException;
    4. import java.net.InetSocketAddress;
    5. import java.net.Socket;
    6. /**
    7. * 连接测试工具类
    8. *
    9. * @author xuwei
    10. * @date 2022/07/18 15:45
    11. **/
    12. public class ConnectUtil {
    13. private static final Logger logger = Logger.getLogger(ConnectUtil.class);
    14. /**
    15. * 测试telnet 机器端口的连通性
    16. *
    17. * @param hostname 地址
    18. * @param port 端口
    19. * @param timeout 超时时间
    20. * @return 是否连通
    21. */
    22. public static boolean telnet(String hostname, int port, int timeout) {
    23. Socket socket = new Socket();
    24. boolean isConnected = false;
    25. try {
    26. socket.connect(new InetSocketAddress(hostname, port), timeout);
    27. isConnected = socket.isConnected();
    28. } catch (IOException ignored) {
    29. logger.warn("Remote server \"" + hostname + ":" + port + "\" connect failed!");
    30. } finally {
    31. try {
    32. socket.close();
    33. } catch (IOException ignored) {
    34. isConnected = false;
    35. logger.warn("Remote server \"" + hostname + ":" + port + "\" connect failed!");
    36. }
    37. }
    38. return isConnected;
    39. }
    40. }

    GetHashCode.java

    重新计算Hash值

    1. package system.common;
    2. /**
    3. * Hash重计算
    4. *
    5. * @author xuwei
    6. * @date 2022/07/18 11:30
    7. **/
    8. public class GetHashCode {
    9. private static final long FNV_32_INIT = 2166136261L;
    10. private static final int FNV_32_PRIME = 16777619;
    11. public static int getHashCode(String origin) {
    12. int hash = (int) FNV_32_INIT;
    13. for (int i = 0; i < origin.length(); i++) {
    14. hash = (hash ^ origin.charAt(i)) * FNV_32_PRIME;
    15. }
    16. hash += hash << 13;
    17. hash ^= hash >> 7;
    18. hash += hash << 3;
    19. hash ^= hash >> 17;
    20. hash += hash << 5;
    21. hash = Math.abs(hash);
    22. return hash;
    23. }
    24. }

     BalanceService.java

    负载均衡接口,包含getServer、addSErver、delServer方法

    1. package system.random;
    2. import system.entity.Server;
    3. /**
    4. * 负载均衡接口
    5. *
    6. * @author xuwei
    7. * @date 2022/07/18 10:41
    8. **/
    9. public interface BalanceService {
    10. /**
    11. * 获取服务器
    12. *
    13. * @param requestNumber 请求量
    14. * @param requestAddress 请求地址
    15. * @return
    16. */
    17. Server getServer(int requestNumber, String requestAddress);
    18. /**
    19. * 添加服务器节点
    20. *
    21. * @param server server
    22. */
    23. void addServerNode(Server server);
    24. /**
    25. * 删除服务器节点
    26. *
    27. * @param server server
    28. */
    29. void delServerNode(Server server);
    30. }

    RandomServerImpl.java

    完全随机负载均衡实现类,将Server放到list中模拟线性区间,生成伪随机数来模拟砸中某个线性区间的场景

    1. package system.random.imp;
    2. import org.apache.log4j.Logger;
    3. import system.entity.Server;
    4. import system.random.BalanceService;
    5. import java.util.Collections;
    6. import java.util.List;
    7. import java.util.Random;
    8. /**
    9. * 完全随机实现类
    10. *
    11. * @author xuwei
    12. * @date 2022/07/18 10:41
    13. **/
    14. public class RandomServerImpl implements BalanceService {
    15. private static final Logger logger = Logger.getLogger(RandomServerImpl.class);
    16. /**
    17. * 服务器列表
    18. */
    19. private final List serverList;
    20. /**
    21. * 伪随机数生成器
    22. */
    23. private final Random random = new Random();
    24. public RandomServerImpl(List serverList) {
    25. this.serverList = Collections.synchronizedList(serverList);
    26. }
    27. /**
    28. * 获取服务器
    29. *
    30. * @param requestNumber
    31. * @param requestAddress
    32. * @return
    33. */
    34. @Override
    35. public Server getServer(int requestNumber, String requestAddress) {
    36. Server server;
    37. if (serverList.isEmpty()) {
    38. logger.warn("Don not have server available!");
    39. return null;
    40. }
    41. server = serverList.get(random.nextInt(serverList.size()));
    42. return server;
    43. }
    44. /**
    45. * 添加服务器节点
    46. *
    47. * @param server server
    48. */
    49. @Override
    50. public void addServerNode(Server server) {
    51. serverList.add(server);
    52. }
    53. /**
    54. * 删除服务器节点
    55. *
    56. * @param server server
    57. */
    58. @Override
    59. public void delServerNode(Server server) {
    60. serverList.removeIf(server1 -> server1.getAddress().equals(server.getAddress()) && server1.getPort().equals(server.getPort()));
    61. }
    62. }

    WeightRandomServerImpl.java

    加权随机负载均衡实现类,在list中放入对应权重数量的server来模拟线性区间内线段增长,以实现概率增加

    1. package system.random.imp;
    2. import org.apache.log4j.Logger;
    3. import system.entity.Server;
    4. import system.random.BalanceService;
    5. import java.util.ArrayList;
    6. import java.util.Collections;
    7. import java.util.List;
    8. import java.util.Random;
    9. /**
    10. * 加权随机实现类
    11. *
    12. * @author xuwei
    13. * @date 2022/07/18 10:41
    14. **/
    15. public class WeightRandomServerImpl implements BalanceService {
    16. private static final Logger logger = Logger.getLogger(WeightRandomServerImpl.class);
    17. /**
    18. * 服务器列表
    19. */
    20. private final List serverList;
    21. /**
    22. * 伪随机数生成器
    23. */
    24. private final Random random = new Random();
    25. public WeightRandomServerImpl(List serverList) {
    26. List servers = new ArrayList<>();
    27. for (Server server : serverList) {
    28. for (int i = 0; i < server.getWeight(); i++) {
    29. servers.add(server);
    30. }
    31. }
    32. this.serverList = Collections.synchronizedList(servers);
    33. }
    34. /**
    35. * 获取服务器
    36. *
    37. * @param requestNumber
    38. * @param requestAddress
    39. * @return
    40. */
    41. @Override
    42. public Server getServer(int requestNumber, String requestAddress) {
    43. Server server;
    44. if (serverList.isEmpty()) {
    45. logger.warn("Don not have server available!");
    46. return null;
    47. }
    48. server = serverList.get(random.nextInt(serverList.size()));
    49. return server;
    50. }
    51. /**
    52. * 添加服务器节点
    53. *
    54. * @param server server
    55. */
    56. @Override
    57. public void addServerNode(Server server) {
    58. for (int i = 0; i < server.getWeight(); i++) {
    59. serverList.add(server);
    60. }
    61. }
    62. /**
    63. * 删除服务器节点
    64. *
    65. * @param server server
    66. */
    67. @Override
    68. public void delServerNode(Server server) {
    69. serverList.removeIf(server1 -> server1.getAddress().equals(server.getAddress()) && server1.getPort().equals(server.getPort()));
    70. }
    71. }

    PollServerImpl.java

    完全轮询负载均衡实现类,指针循环遍历list

    1. package system.random.imp;
    2. import org.apache.log4j.Logger;
    3. import system.entity.Server;
    4. import system.random.BalanceService;
    5. import java.util.Collections;
    6. import java.util.List;
    7. /**
    8. * 简单轮询实现类
    9. *
    10. * @author xuwei
    11. * @date 2022/07/18 10:41
    12. **/
    13. public class PollServerImpl implements BalanceService {
    14. private static final Logger logger = Logger.getLogger(PollServerImpl.class);
    15. /**
    16. * 服务器列表
    17. */
    18. private final List serverList;
    19. public PollServerImpl(List serverList) {
    20. this.serverList = Collections.synchronizedList(serverList);
    21. }
    22. /**
    23. * 获取服务器
    24. *
    25. * @param requestNumber
    26. * @param requestAddress
    27. * @return
    28. */
    29. @Override
    30. public Server getServer(int requestNumber, String requestAddress) {
    31. Server server;
    32. if (serverList.isEmpty()) {
    33. logger.warn("Don not have server available!");
    34. return null;
    35. }
    36. server = serverList.get(requestNumber % serverList.size());
    37. return server;
    38. }
    39. /**
    40. * 添加服务器节点
    41. *
    42. * @param server server
    43. */
    44. @Override
    45. public void addServerNode(Server server) {
    46. serverList.add(server);
    47. }
    48. /**
    49. * 删除服务器节点
    50. *
    51. * @param server server
    52. */
    53. @Override
    54. public void delServerNode(Server server) {
    55. serverList.removeIf(server1 -> server1.getAddress().equals(server.getAddress()) && server1.getPort().equals(server.getPort()));
    56. }
    57. }

    WeightPollServerImpl.java

    加权轮询负载均衡实现类,在完全轮询基础上将服务器在list中的数量增加为权重对应数量

    1. package system.random.imp;
    2. import org.apache.log4j.Logger;
    3. import system.entity.Server;
    4. import system.random.BalanceService;
    5. import java.util.ArrayList;
    6. import java.util.Collections;
    7. import java.util.List;
    8. /**
    9. * 加权轮询实现类
    10. *
    11. * @author xuwei
    12. * @date 2022/07/18 10:41
    13. **/
    14. public class WeightPollServerImpl implements BalanceService {
    15. private static final Logger logger = Logger.getLogger(WeightPollServerImpl.class);
    16. /**
    17. * 服务器列表
    18. */
    19. private final List serverList;
    20. public WeightPollServerImpl(List serverList) {
    21. List servers = new ArrayList<>();
    22. for (Server server : serverList) {
    23. for (int i = 0; i < server.getWeight(); i++) {
    24. servers.add(server);
    25. }
    26. }
    27. this.serverList = Collections.synchronizedList(servers);
    28. }
    29. /**
    30. * 获取服务器
    31. *
    32. * @param requestNumber
    33. * @param requestAddress
    34. * @return
    35. */
    36. @Override
    37. public Server getServer(int requestNumber, String requestAddress) {
    38. Server server;
    39. if (serverList.isEmpty()) {
    40. logger.warn("Don not have server available!");
    41. return null;
    42. }
    43. server = serverList.get(requestNumber % serverList.size());
    44. return server;
    45. }
    46. /**
    47. * 添加服务器节点
    48. *
    49. * @param server server
    50. */
    51. @Override
    52. public void addServerNode(Server server) {
    53. for (int i = 0; i < server.getWeight(); i++) {
    54. serverList.add(server);
    55. }
    56. }
    57. /**
    58. * 删除服务器节点
    59. *
    60. * @param server server
    61. */
    62. @Override
    63. public void delServerNode(Server server) {
    64. serverList.removeIf(server1 -> server1.getAddress().equals(server.getAddress()) && server1.getPort().equals(server.getPort()));
    65. }
    66. }

    HashServerImpl.java

    余数Hash负载均衡实现类,对客户端ip进行Hash运算,对服务节点数量取余来获取相应节点

    1. package system.random.imp;
    2. import org.apache.log4j.Logger;
    3. import system.common.GetHashCode;
    4. import system.entity.Server;
    5. import system.random.BalanceService;
    6. import java.util.Collections;
    7. import java.util.List;
    8. /**
    9. * 余数Hash实现类
    10. *
    11. * @author xuwei
    12. * @date 2022/07/18 10:41
    13. **/
    14. public class HashServerImpl implements BalanceService {
    15. private static final Logger logger = Logger.getLogger(HashServerImpl.class);
    16. /**
    17. * 服务器列表
    18. */
    19. private final List serverList;
    20. public HashServerImpl(List serverList) {
    21. this.serverList = Collections.synchronizedList(serverList);
    22. }
    23. /**
    24. * 获取服务器
    25. * hash直接取余法
    26. *
    27. * @param requestNumber
    28. * @param requestAddress
    29. * @return
    30. */
    31. @Override
    32. public Server getServer(int requestNumber, String requestAddress) {
    33. Server server;
    34. if (serverList.isEmpty()) {
    35. logger.warn("Don not have server available!");
    36. return null;
    37. }
    38. server = serverList.get(GetHashCode.getHashCode(requestAddress) % serverList.size());
    39. return server;
    40. }
    41. /**
    42. * 添加服务器节点
    43. *
    44. * @param server server
    45. */
    46. @Override
    47. public void addServerNode(Server server) {
    48. serverList.add(server);
    49. }
    50. /**
    51. * 删除服务器节点
    52. *
    53. * @param server server
    54. */
    55. @Override
    56. public void delServerNode(Server server) {
    57. serverList.removeIf(server1 -> server1.getAddress().equals(server.getAddress()) && server1.getPort().equals(server.getPort()));
    58. }
    59. }

    ConsistentHashServerImpl.java

    一致性Hash负载均衡实现类,可配置虚拟节点数量,使用TreeMap模拟一致性Hash环,客户端连接到达后计算ip的Hash之后去环内找它后一个节点,如果没有则找第一个节点

    1. package system.random.imp;
    2. import org.apache.log4j.Logger;
    3. import system.common.GetHashCode;
    4. import system.entity.Server;
    5. import system.random.BalanceService;
    6. import java.util.List;
    7. import java.util.Map;
    8. import java.util.TreeMap;
    9. /**
    10. * 一致性Hash实现类
    11. *
    12. * @author xuwei
    13. * @date 2022/07/18 10:41
    14. **/
    15. public class ConsistentHashServerImpl implements BalanceService {
    16. private static final Logger logger = Logger.getLogger(ConsistentHashServerImpl.class);
    17. /**
    18. * 虚拟节点数
    19. */
    20. private final Integer vnnNodeCount;
    21. /**
    22. * 一致性hash环
    23. */
    24. private final TreeMap treeMapHash;
    25. public ConsistentHashServerImpl(List serverList, Integer vnnNodeCount) {
    26. this.vnnNodeCount = vnnNodeCount;
    27. TreeMap treeMapHash = new TreeMap<>();
    28. for (Server server : serverList) {
    29. int hash = GetHashCode.getHashCode(server.getAddress() + server.getPort());
    30. treeMapHash.put(hash, server);
    31. for (int i = 1; i <= this.vnnNodeCount; i++) {
    32. treeMapHash.put(GetHashCode.getHashCode(server.getAddress() + server.getPort() + "&&" + i), server);
    33. }
    34. }
    35. this.treeMapHash = treeMapHash;
    36. }
    37. /**
    38. * 获取服务器
    39. *
    40. * @param requestNumber 请求量
    41. * @param requestAddress 请求地址
    42. * @return
    43. */
    44. @Override
    45. public Server getServer(int requestNumber, String requestAddress) {
    46. Server server;
    47. synchronized (treeMapHash) {
    48. if (treeMapHash.isEmpty()) {
    49. logger.warn("Don not have server available!");
    50. return null;
    51. }
    52. int hash = GetHashCode.getHashCode(requestAddress);
    53. // 向右寻找第一个 key
    54. Map.Entry subEntry = treeMapHash.ceilingEntry(hash);
    55. // 设置成一个环,如果超过尾部,则取第一个点
    56. subEntry = subEntry == null ? treeMapHash.firstEntry() : subEntry;
    57. server = subEntry.getValue();
    58. }
    59. return server;
    60. }
    61. /**
    62. * 添加服务器节点
    63. *
    64. * @param server server
    65. */
    66. @Override
    67. public void addServerNode(Server server) {
    68. synchronized (treeMapHash) {
    69. int hash = GetHashCode.getHashCode(server.getAddress());
    70. treeMapHash.put(hash, server);
    71. for (int i = 1; i <= vnnNodeCount; i++) {
    72. int vnnNodeHash = GetHashCode.getHashCode(server.getAddress() + server.getPort() + "&&" + i);
    73. treeMapHash.put(vnnNodeHash, server);
    74. }
    75. }
    76. }
    77. /**
    78. * 删除服务器节点
    79. *
    80. * @param server server
    81. */
    82. @Override
    83. public void delServerNode(Server server) {
    84. synchronized (treeMapHash) {
    85. int hash = GetHashCode.getHashCode(server.getAddress() + server.getPort());
    86. treeMapHash.remove(hash);
    87. for (int i = 1; i <= vnnNodeCount; i++) {
    88. int vnnNodeHash = GetHashCode.getHashCode(server.getAddress() + server.getPort() + "&&" + i);
    89. treeMapHash.remove(vnnNodeHash);
    90. }
    91. }
    92. }
    93. }

    ServerMonitorImpl.java

    服务监视器,每次获取server都会检测能否连通,连接失败则将节点移除并放入失败列表中,每三秒对列表中服务器重试,如果连接成功将节点放回并在失败列表中删除此服务

    1. package system.random.imp;
    2. import org.apache.log4j.Logger;
    3. import system.common.ConnectUtil;
    4. import system.entity.Server;
    5. import system.random.BalanceService;
    6. import java.util.Collections;
    7. import java.util.LinkedList;
    8. import java.util.List;
    9. import java.util.concurrent.locks.LockSupport;
    10. /**
    11. * 装饰器,实现服务监控,动态增减服务
    12. *
    13. * @author xuwei
    14. * @date 2022/07/27 16:58
    15. **/
    16. public class ServerMonitorImpl implements BalanceService {
    17. private static final Logger logger = Logger.getLogger(ServerMonitorImpl.class);
    18. private final BalanceService balanceService;
    19. /**
    20. * 连接失败服务器列表
    21. */
    22. private final List failServer = Collections.synchronizedList(new LinkedList<>());
    23. private final Thread serverMonitor;
    24. public ServerMonitorImpl(BalanceService balanceService) {
    25. this.balanceService = balanceService;
    26. Runnable runnable = () -> {
    27. logger.info("Server Monitor start!");
    28. while (true) {
    29. LockSupport.parkNanos(1000 * 1000 * 1000 * 3L);
    30. if (Thread.currentThread().isInterrupted()) {
    31. logger.info("Server Monitor stop!");
    32. return;
    33. }
    34. //对错误服务列表一直监控
    35. failServer.removeIf(server -> {
    36. if (ConnectUtil.telnet(server.getAddress(), server.getPort(), 200)) {
    37. addServerNode(server);
    38. return true;
    39. }
    40. return false;
    41. });
    42. }
    43. };
    44. this.serverMonitor = new Thread(runnable);
    45. this.serverMonitor.setName("server-monitor");
    46. this.serverMonitor.start();
    47. }
    48. /**
    49. * 获取服务器
    50. *
    51. * @param requestNumber 请求量
    52. * @param requestAddress 请求地址
    53. * @return
    54. */
    55. @Override
    56. public Server getServer(int requestNumber, String requestAddress) {
    57. Server server;
    58. while (true) {
    59. Server server1 = balanceService.getServer(requestNumber, requestAddress);
    60. if (server1 == null) {
    61. this.serverMonitor.interrupt();
    62. return null;
    63. }
    64. // 测试连接
    65. boolean isConnected = ConnectUtil.telnet(server1.getAddress(), server1.getPort(), 200);
    66. if (isConnected) {
    67. server = server1;
    68. break;
    69. } else {
    70. //失败则加入到失效服务器列表并删除此节点
    71. failServer.add(server1);
    72. delServerNode(server1);
    73. }
    74. }
    75. return server;
    76. }
    77. /**
    78. * 添加服务器节点
    79. *
    80. * @param server server
    81. */
    82. @Override
    83. public void addServerNode(Server server) {
    84. balanceService.addServerNode(server);
    85. }
    86. /**
    87. * 删除服务器节点
    88. *
    89. * @param server server
    90. */
    91. @Override
    92. public void delServerNode(Server server) {
    93. balanceService.delServerNode(server);
    94. }
    95. }

    Main.java

    ServerSocket监听端口,处理连接

    1. import org.apache.log4j.Logger;
    2. import org.springframework.scheduling.concurrent.CustomizableThreadFactory;
    3. import system.configure.Configuration;
    4. import system.entity.Server;
    5. import system.random.BalanceService;
    6. import system.socket.SocketThread;
    7. import java.io.IOException;
    8. import java.net.ServerSocket;
    9. import java.net.Socket;
    10. import java.util.concurrent.SynchronousQueue;
    11. import java.util.concurrent.ThreadPoolExecutor;
    12. import java.util.concurrent.TimeUnit;
    13. /**
    14. * @ClassName Main
    15. * @Author xuwei
    16. * @DATE 2022/4/11
    17. */
    18. public class Main {
    19. public static final int SO_TIME_OUT = 300000;
    20. private static final Configuration CONFIGURATION = Configuration.getConfiguration("src/main/resources/xw-load-balancing.xml");
    21. private static final ThreadPoolExecutor THREAD_POOL_EXECUTOR = new ThreadPoolExecutor(5, 10,
    22. 60L, TimeUnit.SECONDS,
    23. new SynchronousQueue<>(), new CustomizableThreadFactory());
    24. private static final Logger logger = Logger.getLogger(Main.class);
    25. private static int requestNumber = 0;
    26. public static void main(String[] args) {
    27. BalanceService balanceService = CONFIGURATION.getBalanceService();
    28. try {
    29. //启动ServerSocket监听配置文件中的端口
    30. ServerSocket serverSocket = new ServerSocket(CONFIGURATION.getPort());
    31. logger.info("The service runs successfully on port " + CONFIGURATION.getPort());
    32. // 一直监听,接收到新连接,则开启新线程去处理
    33. while (true) {
    34. Socket localSocket = serverSocket.accept();
    35. //判断请求次数是否将要溢出
    36. requestNumber = requestNumber == Integer.MAX_VALUE ? 0 : ++requestNumber;
    37. //根据负载均衡算法获取转发服务器
    38. Server server = balanceService.getServer(requestNumber, localSocket.getInetAddress().getHostAddress());
    39. if (server == null) {
    40. System.exit(0);
    41. }
    42. //5分钟内无数据传输、关闭链接
    43. localSocket.setSoTimeout(SO_TIME_OUT);
    44. logger.info(localSocket.getRemoteSocketAddress().toString().replace("/", "") + " connect to server : \"" + server.getServerName() + "\"");
    45. //启动线程处理本连接
    46. THREAD_POOL_EXECUTOR.submit(new SocketThread(localSocket, server.getAddress(), server.getPort()));
    47. }
    48. } catch (IOException e) {
    49. e.printStackTrace();
    50. }
    51. }
    52. }

    总结

    此项目只是一个简单的实践,目前IO模型还需优化

    源码地址

    https://github.com/CodeXu-cyber/xw-load-balancing.git

  • 相关阅读:
    【skynet】skynet入口解析
    一文带你深入浅出C语言动态内存分配
    服务器数据恢复-Xen server虚拟机数据恢复案例
    Shell 函数
    OpenAI内斗剧情反转!微软力保ChatGPT之父回归?
    会议OA系统
    关于CSS 盒子模型的基础教程
    为知笔记打不开 ziw 文件问题
    Redis常见面试题总结
    Vue3的手脚架使用和组件父子间通信-插槽(Options API)学习笔记
  • 原文地址:https://blog.csdn.net/qq_16469323/article/details/126022905