• 35. 干货系列从零用Rust编写负载均衡及代理,代理服务器的源码升级改造


    wmproxy

    wmproxy已用Rust实现http/https代理, socks5代理, 反向代理, 静态文件服务器,四层TCP/UDP转发,七层负载均衡,内网穿透,后续将实现websocket代理等,会将实现过程分享出来,感兴趣的可以一起造个轮子

    项目地址

    国内: https://gitee.com/tickbh/wmproxy

    github: https://github.com/tickbh/wmproxy

    项目设计目标

    在同一个端口上同时支持HTTP/HTTPS/SOCKS5代理,即假设监听8090端口,那么可以设置如下:

    curl --proxy socks5://127.0.0.1:8090 http://www.baidu.com
    curl --proxy http://127.0.0.1:8090 http://www.baidu.com
    curl --proxy http://127.0.0.1:8090 https://www.baidu.com

    以上方案需要都可以兼容打通,才算成功。

    初始方案

    不做HTTP服务器,仅简单的解析数据流,然后进行数据转发

    pub async fn process(
    username: &Option<String>,
    password: &Option<String>,
    mut inbound: T,
    ) -> Result<(), ProxyError>
    where
    T: AsyncRead + AsyncWrite + Unpin,
    {
    let mut outbound;
    let mut request;
    let mut buffer = BinaryMut::new();
    loop {
    let size = {
    let mut buf = ReadBuf::uninit(buffer.chunk_mut());
    inbound.read_buf(&mut buf).await?;
    buf.filled().len()
    };
    if size == 0 {
    return Err(ProxyError::Extension("empty"));
    }
    unsafe {
    buffer.advance_mut(size);
    }
    request = webparse::Request::new();
    // 通过该方法解析标头是否合法, 若是partial(部分)则继续读数据
    // 若解析失败, 则表示非http协议能处理, 则抛出错误
    // 此处clone为浅拷贝,不确定是否一定能解析成功,不能影响偏移
    match request.parse_buffer(&mut buffer.clone()) {
    Ok(_) => match request.get_connect_url() {
    Some(host) => {
    match HealthCheck::connect(&host).await {
    Ok(v) => outbound = v,
    Err(e) => {
    Self::err_server_status(inbound, 503).await?;
    return Err(ProxyError::from(e));
    }
    }
    break;
    }
    None => {
    if !request.is_partial() {
    Self::err_server_status(inbound, 503).await?;
    return Err(ProxyError::UnknownHost);
    }
    }
    },
    Err(WebError::Http(HttpError::Partial)) => {
    continue;
    }
    Err(_) => {
    return Err(ProxyError::Continue((Some(buffer), inbound)));
    }
    }
    }
    match request.method() {
    &Method::Connect => {
    log::trace!(
    "https connect {:?}",
    String::from_utf8_lossy(buffer.chunk())
    );
    inbound.write_all(b"HTTP/1.1 200 OK\r\n\r\n").await?;
    }
    _ => {
    outbound.write_all(buffer.chunk()).await?;
    }
    }
    let _ = copy_bidirectional(&mut inbound, &mut outbound).await?;
    Ok(())
    }

    此方案仅做浅解析,处理相当高效,但遇到如下问题:

    • HTTP/HTTPS代理服务器需要验证密码
    • HTTP服务存在不同的协议,此方法只兼容HTTP/1.1,无法兼容明确的HTTP/2协议
    • 请求的协议头有些得做修改,此方法无法修改

    改造方案

    • 引入HTTP服务器介入
    • 但是因为需要兼容不同协议,只有等确定协议后才能引入协议,需要预读数据,进行协议判定。
    • HTTPS代理协议只处理一组Connect协议,之后需要解除http协议进行双向绑定。

    完整源码

    1. 预读数据
    • Socks5:第一个字节为0X05,非ascii字符,其它协议不会影响
    • Https: https代理必须发送Connect方法,所以必须以CONNECT或者connect开头,且查询其它HTTP方法没有以C开头的,这里仅判断第一个字符为C或者c,该协议仅处理一条http请求不参与后续TLS握手协议等保证数据安全
    • 其它开头的均被认为http代理
    let mut buffer = BinaryMut::with_capacity(24);
    let size = {
    let mut buf = ReadBuf::uninit(buffer.chunk_mut());
    inbound.read_buf(&mut buf).await?;
    buf.filled().len()
    };
    if size == 0 {
    return Err(ProxyError::Extension("empty"));
    }
    unsafe {
    buffer.advance_mut(size);
    }
    // socks5 协议, 直接返回, 交给socks5层处理
    if buffer.as_slice()[0] == 5 {
    return Err(ProxyError::Continue((Some(buffer), inbound)));
    }
    let mut max_req_num = usize::MAX;
    // https 协议, 以connect开头, 仅处理一条HTTP请求
    if buffer.as_slice()[0] == b'C' || buffer.as_slice()[0] == b'c' {
    max_req_num = 1;
    }
    1. 构建HTTP服务器,构建服务类:
    /// http代理类处理类
    struct Operate {
    /// 用户名
    username: Option<String>,
    /// 密码
    password: Option<String>,
    /// Stream类, https连接后给后续https使用
    stream: Option,
    /// http代理keep-alive的复用
    sender: Option>,
    /// http代理keep-alive的复用
    receiver: Option>>,
    }

    构建HTTP服务

    // 需要将已读的数据buffer重新加到server的已读cache中, 否则解析会出错
    let mut server = Server::new_by_cache(inbound, None, buffer);
    // 构建HTTP服务回调
    let mut operate = Operate {
    username: username.clone(),
    password: password.clone(),
    stream: None,
    sender: None,
    receiver: None,
    };
    server.set_max_req(max_req_num);
    let _e = server.incoming(&mut operate).await?;
    if let Some(outbound) = &mut operate.stream {
    let mut inbound = server.into_io();
    let _ = copy_bidirectional(&mut inbound, outbound).await?;
    }

    此时我们已将数据用HTTP服务进行处理,收到相应的请求再进行给远端做转发:

    HTTP核心处理回调,此处我们用的是async_trait异步回调

    #[async_trait]
    impl OperateTrait for &mut Operate {
    async fn operate(&mut self, request: &mut RecvRequest) -> ProtResult {
    // 已连接直接进行后续处理
    if let Some(sender) = &self.sender {
    sender.send(request.replace_clone(Body::empty())).await?;
    if let Some(res) = self.receiver.as_mut().unwrap().recv().await {
    return Ok(res?)
    }
    return Err(ProtError::Extension("already close by other"))
    }
    // 获取要连接的对象
    let stream = if let Some(host) = request.get_connect_url() {
    match HealthCheck::connect(&host).await {
    Ok(v) => v,
    Err(e) => {
    return Err(ProtError::from(e));
    }
    }
    } else {
    return Err(ProtError::Extension("unknow tcp stream"));
    };
    // 账号密码存在,将获取`Proxy-Authorization`进行校验,如果检验错误返回407协议
    if self.username.is_some() && self.password.is_some() {
    let mut is_auth = false;
    if let Some(auth) = request.headers_mut().remove(&"Proxy-Authorization") {
    if let Some(val) = auth.as_string() {
    is_auth = self.check_basic_auth(&val);
    }
    }
    if !is_auth {
    return Ok(Response::builder().status(407).body("")?.into_type());
    }
    }
    // 判断用户协议
    match request.method() {
    &Method::Connect => {
    // https返回200内容直接进行远端和客户端的双向绑定
    self.stream = Some(stream);
    return Ok(Response::builder().status(200).body("")?.into_type());
    }
    _ => {
    // http协议,需要将客户端的内容转发到服务端,并将服务端数据转回客户端
    let client = Client::new(ClientOption::default(), MaybeHttpsStream::Http(stream));
    let (mut recv, sender) = client.send2(request.replace_clone(Body::empty())).await?;
    match recv.recv().await {
    Some(res) => {
    self.sender = Some(sender);
    self.receiver = Some(recv);
    return Ok(res?)
    },
    None => return Err(ProtError::Extension("already close by other")),
    }
    }
    }
    }
    }

    密码校验,由Basic的密码加密方法,先用base64解密,再用:做拆分,再与用户密码比较

    pub fn check_basic_auth(&self, value: &str) -> bool
    {
    use base64::engine::general_purpose;
    use std::io::Read;
    let vals: Vec<&str> = value.split_whitespace().collect();
    if vals.len() == 1 {
    return false;
    }
    let mut wrapped_reader = Cursor::new(vals[1].as_bytes());
    let mut decoder = base64::read::DecoderReader::new(
    &mut wrapped_reader,
    &general_purpose::STANDARD);
    // handle errors as you normally would
    let mut result: Vec<u8> = Vec::new();
    decoder.read_to_end(&mut result).unwrap();
    if let Ok(value) = String::from_utf8(result) {
    let up: Vec<&str> = value.split(":").collect();
    if up.len() != 2 {
    return false;
    }
    if up[0] == self.username.as_ref().unwrap() ||
    up[1] == self.password.as_ref().unwrap() {
    return true;
    }
    }
    return false;
    }

    小结

    代理在计算机网络很常见,比如服务器群组内部通常只会开一个口进行对外访问,就可以通过内网代理来进行处理,从而更好的保护内网服务器。代理让我们网络更安全,但是警惕非正规的代理可能会窃取您的数据。请用HTTPS内容访问更安全。

    点击 [关注][在看][点赞] 是对作者最大的支持

  • 相关阅读:
    校园网络工程规划与设计
    VMware WorkStation 16的配置安装、CentOS 7的配置安装、使用Finalshell访问虚拟机(22.9.14)
    5.11 Firmware Commit command
    Java开发二面被疯狂问JVM相关,被整懵了!!
    pyflink map 字典写入ES
    SpringMVC---SSM整合&&异常处理&&拦截器
    关于Win10安装keil并且电脑连接jlink踩过的坑
    Android中 输入框输入值时,软键盘弹出后压缩布局(布局上移)的解决方法。
    linux服务器添置一块新硬盘操作
    基于若依ruoyi-nbcio增加flowable流程待办消息的提醒,并提供右上角的红字数字提醒(三)
  • 原文地址:https://www.cnblogs.com/wmproxy/p/wmproxy35.html