- A+
这里给大家分享我在网上总结出来的一些知识,希望对大家有所帮助
前言
前两天我的新同事告诉我一个困扰着他的问题,就是低代码平台中存在很多模块,这些模块的渲染是由模块自身处理的,简言之就是组件请求了自己的数据,一个两个模块还好,要是一次请求了几十个模块,就会出现请求阻塞的问题,而且模块的请求都特别大。
大量的并发请求会导致网络拥塞和带宽限制。特别是当网络带宽有限时,同时发送大量请求可能会导致请求之间的竞争,从而导致请求的响应时间延长。
因此模块的加载就很不顺畅。。。
为了解决这个问题我设计了一个关于前端实现并发请求限制的方案,下面将详细解释这个并发请求限制的方案及实现源码。
核心思路及简易实现
一、收集需要并发的接口列表,并发数,接口调用函数。
二、遍历arr,对于每一项,创建一个promise实例存储到resArr中,创建的时候就已经开始执行了。
三、将创建的promise传入的实例数组中,对于每一项的promise设置其then操作,并将其存储到running数组中,作为执行中的标识。
四、当then操作触发之后则将running中的对应这一项删除,执行中的数组减一。
五、在遍历的回调函数最后判断当前是否超出阈值,当数量达到限制时开始批量执行,用await去处理异步,处理完一个即跳走,重新往running中注入新的实例。
async function asyncLimit(limitNum, arr, fn) { let resArr = []; // 所有promise实例 let running = []; // 执行中的promise数组 for (const item of arr) { const p = Promise.resolve(fn(item)); // 遍历arr,对于每一项,创建一个promise实例存储到resArr中,创建的时候就已经开始执行了 resArr.push(p); if (arr.length >= limitNum) { // 对于每一项设置其then操作,并将其存储到running数组中,作为执行中的标识,当then操作触发之后则将running中的对应这一项删除,执行中的数组减一 const e = p.then(() => running.splice(running.indexOf(e), 1)); running.push(e); if (running.length >= limitNum) { // 当数量达到限制时开始批量执行,处理完一个即跳走,重新往running中注入新的实例 await Promise.race(running); } } } return Promise.allSettled(resArr); }
用例:
fn = (item) => { return new Promise((resolve) => { console.log("开始",item); setTimeout(() => { console.log("结束", item); resolve(item); }, item) }); }; asyncLimit(2, [1000, 2000, 5000, 2000, 3000], fn)
注:但是这里的实现太过简陋,在真正的业务场景中往往没有这样使用场景,因此我对着段代码封装成一个符合前端使用的并发限制模块,下面是完整可用的代码实现
完整实现及源码用例
首先,让我们来看一下代码及用例:
let targetArray = []; // 目标调用数组 let resultArray = []; // 结果数组 let runningPromise = null; // 正在运行的 Promise let limitNum = 0; // 最大并发数 let defaultFn = (value) => value; // 默认处理函数 function asyncInit(limit, fn) { limitNum = limit; defaultFn = fn; } async function asyncLimit(arr) { const promiseArray = []; // 所有 Promise 实例 const running = []; // 正在执行的 Promise 数组 for (const item of arr) { const p = Promise.resolve((item.fn || defaultFn)(item.value || item)); // 调用元素的处理函数 promiseArray.push(p); if (arr.length >= limitNum) { const e = p.then(() => running.splice(running.indexOf(e), 1)); running.push(e); if (running.length >= limitNum) { await Promise.race(running); } } } return Promise.allSettled(promiseArray); } function asyncExecute(item) { targetArray.push(item); if (!runningPromise) { runningPromise = Promise.resolve().then(()=>{ asyncLimit(targetArray).then((res) => { resultArray.push(...res); targetArray = []; runningPromise = null; }); }) } }
这里提供了一个并发模块的文件。
简单用例:
asyncInit(3, (item) => { return new Promise((resolve) => { console.log("开始",item); setTimeout(() => { console.log("结束", item); resolve(item); }, item) }); }) asyncExecute({value: 1000}) asyncExecute({value: 2000}) asyncExecute({value: 5000}) asyncExecute({value: 2000}) asyncExecute({value: 3000})
效果:
注:可以看到我们在使用的时候只需要先初始化最大并发数和默认调用函数,即可直接调用asyncExecute去触发并发请求,而且通过源码我们可以看到如果 asyncExecute 的参数可以自定义调用函数,及传入的对象中包含fn即可。
重点: 因为这些内容都被抽离成一个文件,所以我们可以导出asyncExecute
这个函数然后业务侧不同位置都可以通过这个函数去发起请求,这样就能实现项目中所有请求的并发限制。
代码解释
这段代码实现了一个前端并发限制的机制。让我们逐步解释其中的关键部分。
第一步
我们定义了一些变量,包括目标调用数组 targetArray
、结果数组 resultArray
、正在运行的 Promise runningPromise
、最大并发数 limitNum
和默认处理函数 defaultFn
。
第二步
定义 asyncInit
函数,用于初始化并发限制的最大数和默认处理函数。通过调用该函数,我们可以设置相关并发限制的参数。
第三步
然后,我们定义了 asyncLimit
函数,用于实现并发限制的核心逻辑。
在这个函数中,我们遍历传入的数组 arr
,对每个元素执行处理函数,并将返回的 Promise 实例存储在 promiseArray
数组中。
同时,我们使用 running
数组来跟踪正在执行的 Promise 实例。
如果当前正在执行的 Promise 实例数量达到最大并发数,我们使用 Promise.race
方法等待最先完成的 Promise,以确保并发数始终保持在限制范围内。(这里对应的就是核心思路及简易实现
中的代码)
注:如果要实现异步并发,我们只要保证我们的接口存在于传入的数组即arr
中即可。
第四步
定义 asyncExecute
函数,用于触发异步操作的执行。
当调用 asyncExecute
函数时,我们将目标元素添加到 targetArray
数组中,这个targetArray就是异步并行的接口队列,只要把这个传入到asyncLimit中就能实现异步并行。
检查是否有正在运行的 runningPromise
。
(runningPromise的
作用:
判断当前是否已经有运行中的asyncLimit
)
如果有那么我们只需要继续往targetArray
中加入数据即可,沿用之前的asyncLimit
即之前的Promise 链。
如果没有说明asyncLimit
函数已经执行完了,我们要新开一个asyncLimit
函数去完成我们的并行限制。调用 asyncLimit
函数来处理目标数组中的元素,并基于此创建一个新的 Promise 链。
处理完成后,我们将结果存储在 resultArray
中,并重置目标数组和运行的 Promise。
总结
异步并行逻辑交由asyncLimit
处理即可。
使用上来说,就只需要使用到接口的时候,调用asyncExecute
往 targetArray
加数据就行,默认会直接执行 asyncLimit
并创建一个promise链。
当我们往里面加一项promise链就会对应的多一项,当我们promise链执行完之后我们就会重置targetArray
和runningPromise
。
下次调用asyncExecute
时,如果runningPromise
不存在就重新走上面的逻辑,即直接执行 asyncLimit
并创建一个promise链,当runningPromise
存在的情况下,每次使用asyncExecute
往targetArray
里面push参数即可。