11. 用Rust手把手编写一个wmproxy(代理,内网穿透等), 实现健康检查

rust,手把手,编写,一个,wmproxy,代理,内网,穿透,实现,健康检查 · 浏览次数 : 15

小编点评

```rust use std::sync::mpsc; use std::thread; use std::sync::atomic::Atomic; struct HealthCheck { fail_timeout: usize, max_fails: usize, min_rises: usize, health_map: HashMap, } struct HealthRecord { last_record: Instant, fail_timeout: Duration, fall_times: usize, rise_times: usize, failed: bool, } impl HealthCheck { fn new(fail_timeout: usize, max_fails: usize, min_rises: usize) -> Self { HealthCheck { fail_timeout, max_fails, min_rises, health_map: Atomic::new(HashMap::new()), } } fn add_rise_up(&mut self, addr: SocketAddr) { let mut health_record = self.health_map.entry(addr).or_default(); health_record.rise_times += 1; if health_record.rise_times >= self.min_rises { health_record.failed = false; } } fn add_fall_down(&mut self, addr: SocketAddr) { self.health_map.entry(addr).or_default().failed = true; } } impl TcpStream for HealthCheck { fn connect(&mut self, addr: &str) -> io::Result { let mut addrs = addr.to_socket_addrs()?; for addr in addrs { if self.is_fall_down(&addr) { return Err(io::Error::new(io::ErrorKind::Other, "health check falldown")); } } TcpStream::connect(&addr).await } } ``` **使用方法:** ```rust // Create a channel for communication between threads let (tx, rx) = mpsc::channel(); // Create threads for health check and connection let health_check_thread = thread::spawn(move || { let health_check = HealthCheck::new(10, 3, 5); for _ in 0..10 { let addr = "127.0.0.1:80"; tx.send(addr); rx.recv().unwrap(); health_check.add_rise_up(addr); } }); // Connect to the server and send requests let (stream, _) = TcpStream::connect("127.0.0.1:80"); // ... use the stream ... ```

正文

11. 用Rust手把手编写一个wmproxy(代理,内网穿透等), 实现健康检查

项目 ++wmproxy++

gite: https://gitee.com/tickbh/wmproxy

github: https://github.com/tickbh/wmproxy

健康检查的意义

健康检查维持着系统的稳定运行, 极大的加速着服务的响应时间, 并保证服务器不会把消息包转发到不能响应的服务器上, 从而使系统快速稳定的运转
在LINUX系统中,系统默认TCP建立连接超时时间为127秒。通常网络不可达或者网络连接被拒绝或者网络连接超时需要耗时的时长较长。此时会超成服务器的响应时间变长很多,而且重复发起不可达的连接尝试也在耗着大量的IO资源。
当健康检查介入后,如果短时间内多次建立连接失败,则暂时判定该地址不可达,状态设置为不可达。如果此时接收到该地址的请求时直接返回错误。大大提高了响应的时间。
所以健康检查是必不可少的存在。

如何实现

由于健康状态需要调用的地方可能在任意处需要发起连接的地方,如果通过参数透传也会涉及到多线程的数据共用,如Arc<Mutex<Data>>,取用的时候也是要通过锁共用,且编码的复杂度和理解成本急剧升高,所以此处健康检查选用的是多线程共用的静态处理变量。

Rust中的静态变量

在Rust中,全局变量可以分为两种:

  • 编译期初始化的全局变量
  • 运行期初始化的全局变量

编译期初始化的全局变量有:

const创建的常量,如 const MAX_ID:usize=usize::MAX/2;
static创建的静态变量,如 static mut REQUEST_RECV:usize=0;

运行期初始化的全局变量有lazy_static用于懒初始化。例如:

lazy_static! {
    static ref HEALTH_CHECK: RwLock<HealthCheck> = RwLock::new(HealthCheck::new(60, 3, 2));
}

此外还有

  • 实现你自己的运行时初始化:std::sync::Once + static mut T
  • 单线程运行时初始化的特殊情况:thread_local

我们此处维持一个HealthCheck的全局变量,因为程序是多线程,用thread_local,无法共用其它线程的检测,不条例预期,所以此处用读写锁来保证全局变量的正确性,读写锁的特点是允许存在多个读,但如果获取写必须保证唯一。

源码解析,暂时不做主动性的健康检查

接下来我们看HealthCheck的定义

pub struct HealthCheck {
    /// 健康检查的重置时间, 失败超过该时间会重新检查, 统一单位秒
    fail_timeout: usize,
    /// 最大失败次数, 一定时间内超过该次数认为不可访问
    max_fails: usize,
    /// 最小上线次数, 到达这个次数被认为存活
    min_rises: usize,
    /// 记录跟地址相关的信息
    health_map: HashMap<SocketAddr, HealthRecord>,
}

/// 每个SocketAddr的记录值
struct HealthRecord {
    /// 最后的记录时间
    last_record: Instant,
    /// 失败的恢复时间
    fail_timeout: Duration,
    /// 当前连续失败的次数
    fall_times: usize,
    /// 当前连续成功的次数
    rise_times: usize,
    /// 当前的状态
    failed: bool,
}

主要有最后记录时间,失败次数,成功次数,最大失败惩罚时间等元素组成

我们通过函数is_fall_down判定是否是异常状态,未检查前默认为正常状态,超出一定时间后,解除异常状态。

/// 检测状态是否能连接
pub fn is_fall_down(addr: &SocketAddr) -> bool {
    // 只读,获取读锁
    if let Ok(h) = HEALTH_CHECK.read() {
        if !h.health_map.contains_key(addr) {
            return false;
        }
        let value = h.health_map.get(&addr).unwrap();
        if Instant::now().duration_since(value.last_record) > value.fail_timeout {
            return false;
        }
        h.health_map[addr].failed
    } else {
        false
    }
}

如果连接TCP失败则调用add_fall_down将该地址失败连接次数+1,如果失败次数达到最大失败次数将状态置为不可用。

/// 失败时调用
pub fn add_fall_down(addr: SocketAddr) {
    // 需要写入,获取写入锁
    if let Ok(mut h) = HEALTH_CHECK.write() {
        if !h.health_map.contains_key(&addr) {
            let mut health = HealthRecord::new(h.fail_timeout);
            health.fall_times = 1;
            h.health_map.insert(addr, health);
        } else {
            let max_fails = h.max_fails;
            let value = h.health_map.get_mut(&addr).unwrap();
            // 超出最大的失败时长,重新计算状态
            if Instant::now().duration_since(value.last_record) > value.fail_timeout {
                value.clear_status();
            }
            value.last_record = Instant::now();
            value.fall_times += 1;
            value.rise_times = 0;

            if value.fall_times >= max_fails {
                value.failed = true;
            }
        }
    }
}

如果连接TCP成功则调用add_rise_up将该地址成功连接次数+1,如果成功次数达到最小次数将状态置为不可用。

/// 成功时调用
pub fn add_rise_up(addr: SocketAddr) {
    // 需要写入,获取写入锁
    if let Ok(mut h) = HEALTH_CHECK.write() {
        if !h.health_map.contains_key(&addr) {
            let mut health = HealthRecord::new(h.fail_timeout);
            health.rise_times = 1;
            h.health_map.insert(addr, health);
        } else {
            let min_rises = h.min_rises;
            let value = h.health_map.get_mut(&addr).unwrap();
            // 超出最大的失败时长,重新计算状态
            if Instant::now().duration_since(value.last_record) > value.fail_timeout {
                value.clear_status();
            }
            value.last_record = Instant::now();
            value.rise_times += 1;
            value.fall_times = 0;

            if value.rise_times >= min_rises {
                value.failed = false;
            }
        }
    }
}

接下来我们将TcpStream::connect函数统一替换成HealthCheck::connect外部修改几乎为0,可实现开启健康检查,后续还会有主动式的健康检查。

pub async fn connect<A>(addr: &A) -> io::Result<TcpStream>
    where
        A: ToSocketAddrs,
    {
        let addrs = addr.to_socket_addrs()?;
        let mut last_err = None;

        for addr in addrs {
            // 健康检查失败,直接返回错误
            if Self::is_fall_down(&addr) {
                last_err = Some(io::Error::new(io::ErrorKind::Other, "health check falldown"));
            } else {
                match TcpStream::connect(&addr).await {
                    Ok(stream) => 
                    {
                        Self::add_rise_up(addr);
                        return Ok(stream)
                    },
                    Err(e) => {
                        Self::add_fall_down(addr);
                        last_err = Some(e)
                    },
                }
            }
        }

        Err(last_err.unwrap_or_else(|| {
            io::Error::new(
                io::ErrorKind::InvalidInput,
                "could not resolve to any address",
            )
        }))
    }

效果

在前三次请求的时候,将花费5秒左右才抛出拒绝链接的错误

connect server Err(Os { code: 10061, kind: ConnectionRefused, message: "由于目标计算机积极拒绝,无
法连接。" })

可以发现三次之后,将会快速的抛出错误,达成健康检查的目标

connect server Err(Custom { kind: Other, error: "health check falldown" })

此时被动式的健康检查已完成,后续按需要的话将按需看是否实现主动式的健康检查。

与11. 用Rust手把手编写一个wmproxy(代理,内网穿透等), 实现健康检查相似的内容:

11. 用Rust手把手编写一个wmproxy(代理,内网穿透等), 实现健康检查

健康检查维持着系统的稳定运行, 极大的加速着服务的响应时间, 并保证服务器不会把消息包转发到不能响应的服务器上, 从而使系统快速稳定的运转

用Rust手把手编写一个Proxy(代理), UDP绑定篇

用Rust手把手编写一个Proxy(代理), UDP绑定篇 项目 ++wmproxy++ gite: https://gitee.com/tickbh/wmproxy github: https://github.com/tickbh/wmproxy 了解UDP 特点 UDP是基于IP的简单协议,不

Netcode for Entities如何添加自定义序列化,让GhostField支持任意类型?以int3为例(1.2.3版本)

一句话省流:很麻烦也很抽象,能用内置支持的类型就尽量用。 首先看文档。官方文档里一开头就列出了所有内置的支持的类型:Ghost Type Templates 其中Entity类型需要特别注意一下:在同步这个类型的时候,如果是刚刚Instantiate的Ghost(也就是GhostId尚未生效,上一篇

华为云短信服务教你用C++实现Smgp协议

本文简单对SGIP协议进行了介绍,并尝试用C++实现协议栈,但实际商用发送短信往往更加复杂,可以选择华为云消息&短信服务通过HTTP协议接入。

两种解法搞定Swap Nodes in Pairs算法题

最近还是很喜欢用golang来刷算法题,更接近通用算法,也没有像动态脚本语言那些语法糖,真正靠实力去解决问题。 下面这道题很有趣,也是一道链表题目,具体如下: 24. Swap Nodes in Pairs Solved Medium Topics Companies Given a linked

关于.Net 6.0 在Linux ,Docker容器中,不安装任何依赖就生成图形验证码!!!!!!!!!!!

在.Net Framework时代,我们生成验证码大多都是用System.Drawing。 在.Net 6中使用也是没有问题的。 但是,System.Drawing却依赖于Windows GDI+。 为了实现跨平台,我陷入了沉思!! 微软推荐使用SkiaSharp 进行替代,所以就开始了,踩坑之旅

Java中可以用的大数据推荐算法

在Java中实现大数据推荐算法时,通常会使用一些开源的机器学习库,如Apache Mahout、Weka、DL4J(DeepLearning4j,用于深度学习)或者Spark MLlib(用于在Spark集群上运行)。由于完整实现一个大数据推荐算法的代码量可能非常大,并且需要配合具体的数据集和环境进

如何用华为云ModelArts平台玩转Llama2

既然Llama 2现已人人可用,那么如何在华为云上去微调实现更多可能的应用呢?

C#11之原始字符串

最近.NET7.0和C#11相继发布,笔者也是第一时间就用上了C#11,其中C#11的有一个更新能解决困扰我多年的问题,也就是文章的标题原始字符串。 在使用C#11的原始字符串时,发现的一些有意思的东西,超出了我原本对它的期待,话不多说,我们一起来看看。 多年的困扰 我不知道大家有没有写过这样的代码

Aveva Marine VBNET 编程系列-封装一个类

由于AM的marapi的大部分类实现了IDisposable接口,所有避免内存过大,用了一般需要dispose下 微软官方的解释: https://learn.microsoft.com/zh-cn/dotnet/api/system.idisposable?view=net-7.0 以下是MarD