从中间件团队窃取了这个组件,见识到了编码能力的天花板!!

中间件,团队,窃取,这个,组件,见识,编码,能力,天花板 · 浏览次数 : 2311

小编点评

☆什么是我们说这款工具是一款基于“生产者-消费者模式”实现的组件? 以前生产者必须同步等待调用、等待相关业务操作的处理结果后才能返回(一旦有些业务场景生产者生产的速度过快,方法内部自身业务处理又比较耗时) 这时如果同步等待调用结果返回,系统整体吞吐量会极具降低 现在换了一种思路即生产者不需要同步等待业务的处理结果,当它发送一个请求后立即返回,耗时的处理由一致多个消费者线程来异步处理,加快任务整体处理速度。 ☆适用场景它比较适合处理一些重要程度不是很高的数据(比如埋点日志、下载请求等),当生产者生产数据过快,业务本身处理又比较耗时,那用这个方案是比较合适的。 ☆怎么实现的消费者线程在组件初始化的时候会更具指定threadNum数量完成新建。 run方法一开始就进行死循环检查,根据两个条件判断:条件一:工作线程对象内部的阻塞队列大小实际元素个数是否超过指定阈值;条件二:当前时间和上一次任务处理时间差是否超过指定阈值,如果两个都不满足,工作线程就通过LockSupport.park(this)将自己阻塞,等待条件之一被满足,然后被唤醒。每个工作线程内部的add方法和time方法是触发上述条件之一的入口,详细请参详架构设计部分。 ☆写到最后这款组件几乎囊括了并发编程领域半壁江山的技术点,能把这些散的点串起来,用好用对,着实不容易,也体现出组件作者深厚的基础技术功底。 如果你是《并发编程》的初学者亦或有几年经验的老兵,都建议好好揣摩与学习一下这款组件的架构设计与编码实现;如果在你的生产场景中你刚好也碰到类似问题与场景,那么这款组件也许能帮助你。

正文

大家好,我是陶朱公Boy,又跟大家见面了。

前言

今天跟大家分享一款基于“生产者消费者模式”下实现的组件。

该组件是作者偶然在翻阅公司一中间件源码的时候碰到的,觉得设计的非常精美、巧妙,花了点时间整理成文分享给大家。

生产者和消费者彼此之间不进行通信,中间通过一个容器(如阻塞队列)来解决强解耦问题。
阻塞队列起到了一定的数据缓冲作用,平衡了生产者和消费者对数据的处理能力。by—《Java并发编程的艺术》

组件介绍

该组件基于生产者消费者模式来编码实现,是一款本地化解决流量削峰、解耦、异步的利器。

此组件由以下知识点构成:线程池、阻塞队列、LockSupport、Executor框架、final、volatile。此外你还能接触到hash取模算法、接口回调等机制。

组件本身代码量并不大,但知识点比较密集,所以希望大家能花一点时间认真看完。我将从适用场景、架构设计、源码解析这三个角度给大家讲介绍这款组件。

适用场景

 

☆场景一:报表下载

现在很多后台下载功能,普适的做法是先筛选转换数据,然后对接云存储平台进行保存,最后生成一个可访问的文件地址,整个过程非常耗时。

其实完全可以生产者发送一个下载请求就结束响应,服务端异步的去消费这个任务请求,处理完生成地址后,再进行通知(比如更新对应数据库文件字段)这是一种异步体现,也解耦了生产者与消费者原来的同步交互方式,整体效率会更高。

☆场景二:日志埋点

有些应用它的QPS非常高,产生的数据本身并不是特别重要比如埋点的日志,如果实时调用埋点平台可能给平台侧造成非常大的访问压力。所以这个时候中间的阻塞队列就起到了一定的缓冲作用,等一段时间或队列数据量达到一定量(参赛可动态配置)再一次性拿出来转换后,最后批量传递出去。

☆场景三:Yana(阿里内部一款基于邮件分享技术文章的工具)

《Java并发编程的艺术》作者方腾飞有分享过他们基于生产者消费者模式实现的一个案例。

他们团队早期有一个习惯,大家如果在平时工作当中遇到比较好的文章,会通过邮件转发到专属邮箱进行内部分享,这样其他成员就能看到这篇文章,甚至大家会在底部评论、回复、交流。

但期间遇到一个问题:一旦时间一长,以前的文章很难被检阅。邮件列表的可视化太差,也不能进行归类,有些新入职员工也看不到以往其他成员分享过的文章。

基于这些问题,有几个小伙伴自发的趁业余时间开发了一个简易工具--yana。该工具功能就是:生产者线程会先往邮箱里将所有分享的邮件下载下来(包括附件、图片、邮件回复等内容),下载完成后,通过confluence的Web Service接口,把文章保存到confluence中去。这样不仅好维护,而且留存问题也得到了解决。

不过随着这款工具在其他部门的推广,发现系统响应时间越来越长。只要单位时间内积累邮件一多,一次处理完可能就要花费几分钟。

于是他们升级了方案,把架构演进到了V2.0版本。整体思路是使用了生产者消费者模式来处理。

思路如下:生产者线程去邮件系统下载完邮件后,不会立即调用confluence的web service接口,而是选择把下载的内容放入阻塞队列后立即返回。而消费者启动CPU*2个线程数来并行处理队列中的邮件,从之前的单线程演变成了多线程处理,生产者和消费者实现了异步、解耦。经过观察,比起V1.0同步处理,速度比之前要快好了几倍。 

...

架构设计

☆对象图

该组件支持“多生产者多消费者”场景,在多核时代充分利用CPU多核机制,消费者多线程并行处理阻塞队列中的数据,加快任务处理速度。

☆逻辑架构图

该组件内部持有一个工作线程对象数组,当生产者提交数据的时候,会先经过一个route组件(采用hash取模算法),动态路由到其中一个线程对象内的阻塞队列中存储起来。等到满足一定条件,工作线程就会将自身线程对象内阻塞队列中的数据转换成指定容量的List对象(BlockQueue的drainto方法有支持),然后调用已经注册的回调函数把数据传递出去。

☆流程图

我们一起来看下这张工作线程内部运行流程图:

首选我们说此组件对象持有一个工作线程对象数组,每个工作线程对象内部持有一个有界阻塞队列实例对象(ArrayBlockingQueue),主要有run(),add(),timeout()方法。

1)add方法:

生产者调用组件的add方法后,add方法内部通过hash取模算法动态路由到某个工作线程对象,继而调用对象内部的add方法,将对象添加到指定阻塞队列中去。

生产者线程调用组件的add方法后,add方法内部通过hash取模算法,会动态路由到指定工作线程对象,继而调用其内部的add方法。当工作者线程内部的add被调用后,方法底部都会检查当前队列的实际大小是否超过指定阈值(可配置),如果超过,就会触发LockSupport.unpark方法,唤醒被阻塞的工作线程。

2)timeout方法

ScheduledThreadPoolExecutor在组件初始化新建工作线程的时候,为每一个工作线程对象开启一个定时器,按固定时间间隔周期(可配置),检查工作线程距离上一次任务处理完的时间差是否超过指定阈值(可配置),如果超过就会触发LockSupport.unpark方法,唤醒被阻塞的工作线程。

很明显上述timeout机制,是为容错考虑。大家想过没有,通过add方法将元素添加到阻塞队列中去,假如某一个时间窗口,队列中的元素个数达不到指定阈值,那就尴尬了,工作线程永远无法将队列中的元素消费掉,这个时候从高可用角度就需要双通道保障机制了,而timeout就是为了应对这种场景而设计的容错方案。

源码赏析

public class ProducerAndConsumerComponet {
    private final static Logger log = LoggerFactory.getLogger(ProducerAndConsumerComponet.class);

    //组件持有一个工作线程对象数组
    private final WorkThread<T>[] workThreads;
    private AtomicInteger index;
    private static final Random r = new Random();
    //任务定时器
    private static ScheduledExecutorService scheduleThreadPool = new ScheduledThreadPoolExecutor(1);
    //组件初始化完成工作线程的新建
    private static ExecutorService executorService = Executors.newCachedThreadPool();

    /**
     * 构造器
     * @param threadNum 默认新建的消费者线程个数
     * @param limitSize 队列长度阈值;超过将唤醒阻塞的线程
     * @param period 前后两个任务的执行周期 (for example :200ms 代表前面一次任务执行完毕后,200毫秒后下一个任务继续执行)
     * @param capacity 工作线程内部的有界阻塞队列的初始容量大小
     * @param processor 回调接口(初始化组价实例的时候需要传递)
     */
    public ProducerAndConsumerComponet(int threadNum,int limitSize, int period, int capacity, Processor<T> processor) {
        this.workThreads = new WorkThread[threadNum];
        if (threadNum > 1) {
            this.index = new AtomicInteger();
        }
        for(int i = 0; i < threadNum; ++i) {
            WorkThread<T> workThread = new WorkThread("workThread"+ "_" + i, limitSize, period, capacity, processor);
            this.workThreads[i] = workThread;

            executorService.submit(workThread);
            //调用scheduleAtFixedRate时,会向ScheduledThreadPoolExecutor的DelayQueue添加一个实现了RunableScheduleFuture接口的
            //ScheduleFutureTask
            scheduleThreadPool.scheduleAtFixedRate(workThread::timeout, r.nextInt(50), period, TimeUnit.MILLISECONDS);
        }
    }

    /**
     * 生产者线程将对象添加到对应消费者线程对象内部的阻塞队列中去<br>
     * 内部采用HASH取模算法进行动态路由
     * @param item 待添加的对象
     * @return true:添加成功 false:添加失败
     */
    public boolean add(T item) {
       // log.info("add item={}",item);
        int len = this.workThreads.length;
        //log.info("add len..."+len);
        if (len == 1) {
            return this.workThreads[0].add(item);
        } else {
            int mod = this.index.incrementAndGet() % len;
         // log.info("路由到this.workThreads[mod]={}",mod);
            return this.workThreads[mod].add(item);
        }
    }

    /**
     * 消费者线程
     */
    private static class WorkThread<T> implements Runnable {
        /**
         * 工作线程命名
         */
        private final String threadName;
        /**
         * 队列中允许存放元素个数限制<br>
         * 超出将从队列中取出此大小的元素转成List对象
         */
        private final int queueSizeLimit;
        /**
         * 前后两个任务的执行周期
         */
        private int period;
        /**
         * 用来记录任务的即时处理时间
         */
        private volatile long lastFlushTime;

        /**
         * 当前工作线程对象
         */
        private volatile Thread currentThread;

        /**
         * 工作线程对象内部的阻塞队列
         */
        private final BlockingQueue<T> queue;
        /**
         * 回调接口
         */
        private final Processor<T> processor;


        /**
         * 消费者线程构造器
         * @param threadName 线程名
         * @param queueSizeLimit 指定队列阈值(可配置)
         * @param period 前后两个任务的执行周期(可配置)
         * @param capacity 阻塞队列初始容量
         * @param processor 回调接口
         */
        public WorkThread(String threadName, int queueSizeLimit, int period, int capacity, Processor<T> processor) {
            this.threadName = threadName;
            this.queueSizeLimit = queueSizeLimit;
            this.period = period;
            this.lastFlushTime = System.currentTimeMillis();
            this.processor = processor;
            this.queue = new ArrayBlockingQueue(capacity);
        }

        /**
         * 往阻塞队列中添加元素
         * @param item 添加的对象
         * @return true:添加成功 false:添加失败
         */
        public boolean add(T item) {
        // log.info("add result:"+item);
            boolean result = this.queue.offer(item);
          // log.info("resultP{}",result);
            this.checkQueueSize();
            return result;
        }

        /**
         * 当前时间与上次任务处理时间差是否超过指定阈值;如果超过触发start方法
         */
        public void timeout() {
          // log.info("{}====check timeout",currentThread.getName());
            if (System.currentTimeMillis() - this.lastFlushTime >= (long)this.period) {
                log.info("当前时间距离上次任务处理时间周期={}超出指定阈值={}",System.currentTimeMillis() - this.lastFlushTime ,period);
                this.start();
            }

        }

        /**
         * 唤醒被阻塞的工作线程
         */
        private void start() {
            log.info("执行start方法,唤醒被阻塞的线程"+currentThread.getName());
            LockSupport.unpark(this.currentThread);
        }

        /**
         * 判断队列实际长度是否超过指定阈值;如果超过触发start方法
         */
        private void checkQueueSize() {
            if (this.queue.size() > this.queueSizeLimit) {
                log.info("{}队列大小={}超出指定阈值={}",currentThread.getName(),this.queue.size() ,queueSizeLimit);
                this.start();
            }

        }

        /**
         * 将队列中的元素通过调用<code>drainTo</code>方法,转成List对象(容量受queueSizeLimit限制),最后调用回调函数传递List对象
         */
        public void flush() {
            if(queue.isEmpty()){
                return;
            }

            this.lastFlushTime = System.currentTimeMillis();
            List<T> temp = new ArrayList(this.queueSizeLimit);
            int size = this.queue.drainTo(temp, this.queueSizeLimit);
            if (size > 0) {
                log.info("{}被唤醒后,开始执行任务:从队列中腾出大小为{}的数据且转成List对象",currentThread.getName(),size);
                try {
                    //执行回调函数
                    this.processor.process(temp);
                } catch (Throwable var4) {
                    System.out.println("process error");
                }
            }

        }

        /**
         * 判断队列实际大小是否超过指定阈值亦或距离上次任务处理时间差是否超过指定阈值
         * @return true:满足触发条件 false:不满足触发条件
         */
        private boolean canFlush() {
            return this.queue.size() > this.queueSizeLimit || System.currentTimeMillis() - this.lastFlushTime > (long)this.period;
        }

        @Override
        public void run() {
            this.currentThread = Thread.currentThread();
            this.currentThread.setName(this.threadName);
            //当前线程没有被其他线程打断
            while(!this.currentThread.isInterrupted()) {
                //死循环的判断是否满足触发条件(队列实际大小是否超出指定阈值或距离上次任务时间是否超出指定阈值),如果未满足将阻塞当前线程,避免死循环给系统带来性能开销
                while(!this.canFlush()) {
                    //当前工作线程被阻塞
                    log.info("线程被阻塞...");
                    LockSupport.park(this);
                }
                //一旦add方法执行的时候判断存放的阻塞队列元素大小超出自定制阈值亦或距离上次任务处理时间差超出指定阈值,就会调用LockSupport.unpark方法解除阻塞的线程
                //一旦线程被解除阻塞,就会触发此方法,将队列元素转成List对象且调用已经注册的回调函数
              // log.info("阻塞线程被唤醒");
                this.flush();
            }

        }
    }
复制代码

测试用例

/**
   * 前置条件:
   * #主件初始化默认新增两个工作线程
   * config.threadNum=2
   * config.period=12000
   * config.queueSizeLimit=3
   * config.capacity=10
   */
  @DisplayName("队列大小超出指定阈值")
  @Test
  void add() {
    for(int i=0;i<10;i++){

      producerAndConsumerComponet.add("1");
    }
    try {
      TimeUnit.SECONDS.sleep(10);
    }catch (Exception e){
      e.printStackTrace();
    }
  }

结果打印:

2022-10-29 21:04:51,656 [main] INFO c.t.c.c.ProducerAndConsumerComponet [ProducerAndConsumerComponet.java:165] - workThread_1队列大小=4超出指定阈值=3
2022-10-29 21:04:51,658 [main] INFO c.t.c.c.ProducerAndConsumerComponet [ProducerAndConsumerComponet.java:156] - 执行start方法,唤醒被阻塞的线程workThread_1
2022-10-29 21:04:51,659 [workThread_1] INFO c.t.c.c.ProducerAndConsumerComponet [ProducerAndConsumerComponet.java:183] - workThread_1被唤醒后,开始执行任务:从队列中腾出大小为3的数据且转成List对象
2022-10-29 21:04:51,659 [workThread_1] INFO c.t.c.c.ProducerAndConsumerComponet [ProducerAndConsumerComponet.java:211] - 线程被阻塞...
2022-10-29 21:04:51,659 [main] INFO c.t.c.c.ProducerAndConsumerComponet [ProducerAndConsumerComponet.java:165] - workThread_0队列大小=4超出指定阈值=3
2022-10-29 21:04:51,659 [main] INFO c.t.c.c.ProducerAndConsumerComponet [ProducerAndConsumerComponet.java:156] - 执行start方法,唤醒被阻塞的线程workThread_0
2022-10-29 21:04:51,660 [workThread_0] INFO c.t.c.c.ProducerAndConsumerComponet [ProducerAndConsumerComponet.java:183] - workThread_0被唤醒后,开始执行任务:从队列中腾出大小为3的数据且转成List对象
2022-10-29 21:04:51,660 [workThread_0] INFO c.t.c.c.ProducerAndConsumerComponet [ProducerAndConsumerComponet.java:211] - 线程被阻塞...
2022-10-29 21:04:53,374 [pool-1-thread-1] INFO c.t.c.c.ProducerAndConsumerComponet [ProducerAndConsumerComponet.java:146] - 当前时间距离上次任务处理时间周期=1714超出指定阈值=1000
2022-10-29 21:04:53,374 [pool-1-thread-1] INFO c.t.c.c.ProducerAndConsumerComponet [ProducerAndConsumerComponet.java:156] - 执行start方法,唤醒被阻塞的线程workThread_0
2022-10-29 21:04:53,375 [workThread_0] INFO c.t.c.c.ProducerAndConsumerComponet [ProducerAndConsumerComponet.java:183] - workThread_0被唤醒后,开始执行任务:从队列中腾出大小为2的数据且转成List对象
2022-10-29 21:04:53,375 [workThread_0] INFO c.t.c.c.ProducerAndConsumerComponet [ProducerAndConsumerComponet.java:211] - 线程被阻塞...
2022-10-29 21:04:53,379 [pool-1-thread-1] INFO c.t.c.c.ProducerAndConsumerComponet [ProducerAndConsumerComponet.java:146] - 当前时间距离上次任务处理时间周期=1720超出指定阈值=1000
2022-10-29 21:04:53,379 [pool-1-thread-1] INFO c.t.c.c.ProducerAndConsumerComponet [ProducerAndConsumerComponet.java:156] - 执行start方法,唤醒被阻塞的线程workThread_1
2022-10-29 21:04:53,380 [workThread_1] INFO c.t.c.c.ProducerAndConsumerComponet [ProducerAndConsumerComponet.java:183] - workThread_1被唤醒后,开始执行任务:从队列中腾出大小为2的数据且转成List对象
2022-10-29 21:04:53,380 [workThread_1] INFO c.t.c.c.ProducerAndConsumerComponet [ProducerAndConsumerComponet.java:211] - 线程被阻塞...
复制代码

可能有部分小伙伴对在生产环境如何正确使用这款组件有疑虑,对此完整版我已经开源到GitHub上(基于springboot构建内含如何正确初始化此组件以及完整的测试用例),有兴趣的小伙伴可以自取。

github地址:https://github.com/TaoZhuGongBoy/ProducerAndConsumerComponet

总结

好了这款组件介绍已接近尾声,接下来让我们一起做下总结:

☆是什么

我们说这款工具是一款基于“生产者-消费者模式”实现的组件。以前生产者必须同步调用、等待相关业务操作的处理结果后才能返回(一旦有些业务场景生产者生产的速度过快,方法内部自身业务处理又比较耗时)这时如果同步等待调用结果返回,系统整体吞吐量会极具降低。现在换了一种思路即生产者不需要同步等待业务的处理结果,当它发送一个请求后立即返回,耗时的处理由一致多个消费者线程来异步处理,加快任务整体处理速度。(异步、解耦

☆适用场景

它比较适合处理一些重要程度不是很高的数据(比如埋点日志、下载请求等),当生产者生产数据过快,业务本身处理又比较耗时,那用这个方案是比较合适的。

为什么在这里要强调重要程度不是很高这句话呢?因为BlockQueue毕竟是基于内存的数据结构,极端情况下是存在数据丢失风险的,像埋点日志、下载请求这种数据小部分丢失其实对业务影响不大。

看到这里可能有部分小伙伴会产生一个疑问:怎么看这个组件的功能跟MQ这么像。对的,功能是相似的,但这款组件是一个本地化的解决方案,目的就是为了降低引入消息队列的复杂度才设计(设计意图)。

☆怎么实现的

消费者线程在组件初始化的时候会更具指定threadNum数量完成新建。run方法一开始就进行死循环检查,根据两个条件判断:
条件一:工作线程对象内部的阻塞队列大小实际元素个数是否超过指定阈值;
条件二:当前时间和上一次任务处理时间差是否超过指定阈值,如果两个都不满足,工作线程就通过LockSupport.park(this)将自己阻塞,等待条件之一被满足,然后被唤醒。
每个工作线程内部的add方法和time方法是触发上述条件之一的入口,详细请参详架构设计部分。

写到最后

这款组件几乎囊括了并发编程领域半壁江山的技术点,能把这些散的点串起来,用好用对,着实不容易,也体现出组件作者深厚的基础技术功底。

如果你是《并发编程》的初学者亦或有几年经验的老兵,都建议好好揣摩与学习一下这款组件的架构设计与编码实现;如果在你的生产场景中你刚好也碰到类似问题与场景,那么这款组件也许能帮助你。

本文完!

640 (1).gif

关注我

如果这篇文章你看了对你有帮助或启发,麻烦点赞、关注一下作者。你的肯定是作者创作源源不断的动力。

公众号

里面不仅汇集了硬核的干货技术、还汇集了像左耳朵耗子、张朝阳总结的高效学习方法论、职场升迁窍门、软技能。希望能辅助你达到你想梦想之地!

公众号内回复关键字电子书”下载pdf格式的电子书籍(JAVAEE、Spring、JVM、并发编程、Mysql、Linux、kafka、分布式等)、“开发手册”获取阿里开发手册2本、"面试"获取面试PDF资料。

 

 

与从中间件团队窃取了这个组件,见识到了编码能力的天花板!!相似的内容:

从中间件团队窃取了这个组件,见识到了编码能力的天花板!!

大家好,我是陶朱公Boy. 今天跟大家分享一款基于“生产者消费者模式”下实现的组件。 该组件是作者偶然在翻阅公司一中间件源码的时候碰到的,觉得设计的非常精美、巧妙,花了点时间整理成文分享给大家。

我们从 CircleCI 安全事件获得的3个经验教训

CircleCI 作为业内最受欢迎的 CI/CD 平台提供商之一,有超过20万个 DevOps 团队使用其平台。该公司在今年1月在其官网报告了一起安全事件引起客户恐慌。在此事件中,有身份不明的恶意攻击者入侵了一名员工的笔记本电脑,利用恶意软件窃取了员工的 2FA 支持的单点登陆会话 cookie,使

定时任务原理方案综述 | 京东云技术团队

本文主要介绍目前存在的定时任务处理解决方案。业务系统中存在众多的任务需要定时或定期执行,并且针对不同的系统架构也需要提供不同的解决方案。京东内部也提供了众多定时任务中间件来支持,总结当前各种定时任务原理,从定时任务基础原理、单机定时任务(单线程、多线程)、分布式定时任务介绍目前主流的定时任务的基本原理组成、优缺点等。希望能帮助读者深入理解定时任务具体的算法和实现方案。

Velocity 不用愁!Velocity 系统的前端工程化之路

Velocity是一个基于Java的Web页面模版引擎。十多年前,Velocity将Java代码从Web页面中分离出来,使得开发者能够并行网页开发和Java开发。随着十年前后端分离的浪潮涌动,回首再面对这些基于Velocity的旧系统,无论是后端还是前端人员维护,都会存在诸多问题

如何使用Java创建数据透视表并导出为PDF

摘要:本文由葡萄城技术团队原创并首发。转载请注明出处:葡萄城官网,葡萄城为开发者提供专业的开发工具、解决方案和服务,赋能开发者。 前言 数据透视分析是一种强大的工具,可以帮助我们从大量数据中提取有用信息并进行深入分析。而在Java开发中,可以借助PivotTable,通过数据透视分析揭示数据中的隐藏

[转帖]微服务并不是银弹

https://www.oschina.net/translate/microservices-not-a-silver-bullet?print 让我们看一个公司/客户与前端框架技术团队之间的典型对话。 这是你从公司/客户那里听到的,技术团队因此而疯狂。当整个世界都从微服务中获益时,为什么不应该采

从PDF到OFD,国产化浪潮下多种文档格式导出的完美解决方案

前言 近年来,中国在信息技术领域持续追求自主创新和供应链安全,伴随信创上升为国家战略,一些行业也开始明确要求文件导出的格式必须为 OFD 格式。OFD 格式目前在政府、金融、税务、教育、医疗等需要文件开放、共享和长期保存的行业中广泛应用。这种趋势在未来几年内将进一步增强。 相较于 PDF,OFD 在

从0到1构造自定义限流组件

在系统高可用设计中,接口限流是一个非常重要环节,一方面是出于对自身服务器资源的保护,另一方面也是对依赖资源的一种保护措施。比如对于 Web 应用,我限制单机只能处理每秒 1000 次的请求,超过的部分直接返回错误给客户端。虽然这种做法损害了用户的使用体验,但是它是在极端并发下的无奈之举,是短暂的行为,因此是可以接受的。

从0到1构建基于自身业务的前端工具库

在实际项目开发中无论 M 端、PC 端,或多或少都有一个 utils 文件目录去管理项目中用到的一些常用的工具方法,比如:时间处理、价格处理、解析url参数、加载脚本等,其中很多是重复、基础、或基于某种业务场景的工具,存在项目间冗余的痛点以及工具方法规范不统一的问题

从2PC和容错共识算法讨论zookeeper中的Create请求

最近在读《数据密集型应用系统设计》,其中谈到了zookeeper对容错共识算法的应用。这让我想到之前参考的zookeeper学习资料中,误将容错共识算法写成了2PC(两阶段提交协议),所以准备以此文对共识算法和2PC做梳理和区分,也希望它能帮助像我一样对这两者有误解的同学。