Js Async / Await Tasks Queue
Solution 1:
You could use a Queue data structure as your base and add special behavior in a child class. a Queue
has a well known interface of two methods enqueue()
(add new item to end) and dequeue()
(remove first item). In your case dequeue()
awaits for the async task.
Special behavior:
- Each time a new task (e.g.
fetch('url')
) gets enqueued,this.dequeue()
gets invoked.- What
dequeue()
does:
- if queue is empty ➜
return false
(break out of recursion)- if queue is busy ➜
return false
(prev. task not finished)- else ➜ remove first task from queue and run it
- on task "complete" (successful or with errors) ➜ recursive call
dequeue()
(2.), until queue is empty..
classQueue {
constructor() { this._items = []; }
enqueue(item) { this._items.push(item); }
dequeue() { returnthis._items.shift(); }
getsize() { returnthis._items.length; }
}
classAutoQueueextendsQueue {
constructor() {
super();
this._pendingPromise = false;
}
enqueue(action) {
returnnewPromise((resolve, reject) => {
super.enqueue({ action, resolve, reject });
this.dequeue();
});
}
asyncdequeue() {
if (this._pendingPromise) returnfalse;
let item = super.dequeue();
if (!item) returnfalse;
try {
this._pendingPromise = true;
let payload = await item.action(this);
this._pendingPromise = false;
item.resolve(payload);
} catch (e) {
this._pendingPromise = false;
item.reject(e);
} finally {
this.dequeue();
}
returntrue;
}
}
// Helper function for 'fake' tasks// Returned Promise is wrapped! (tasks should not run right after initialization)let_ = ({ ms, ...foo } = {}) => () =>newPromise(resolve =>setTimeout(resolve, ms, foo));
// ... create some fake taskslet p1 = _({ ms: 50, url: '❪𝟭❫', data: { w: 1 } });
let p2 = _({ ms: 20, url: '❪𝟮❫', data: { x: 2 } });
let p3 = _({ ms: 70, url: '❪𝟯❫', data: { y: 3 } });
let p4 = _({ ms: 30, url: '❪𝟰❫', data: { z: 4 } });
const aQueue = newAutoQueue();
const start = performance.now();
aQueue.enqueue(p1).then(({ url, data }) =>console.log('%s DONE %fms', url, performance.now() - start)); // = 50
aQueue.enqueue(p2).then(({ url, data }) =>console.log('%s DONE %fms', url, performance.now() - start)); // 50 + 20 = 70
aQueue.enqueue(p3).then(({ url, data }) =>console.log('%s DONE %fms', url, performance.now() - start)); // 70 + 70 = 140
aQueue.enqueue(p4).then(({ url, data }) =>console.log('%s DONE %fms', url, performance.now() - start)); // 140 + 30 = 170
Interactive demo:
Full code demo: https://codesandbox.io/s/async-queue-ghpqm?file=/src/index.js You can play around and watch results in console and/or dev-tools "performance" tab. The rest of this answer is based on it.
Explain:
enqueue()
returns a newPromise
, that will be resolved(or rejected) at some point later. This Promise
can be used to handle the response of your async
task Fn.
enqueue()
actually push()
an Object
into the queue, that holds the task Fn and the control methods for the returned Promise.
Since the unwrapped returned Promise
insta. starts to run, this.dequeue()
is invoked each time we enqueue a new task.
With some performance.measure() added to our task
, we get good visualization of our queue:
- 1st row is our queue instance
- New enqueued
tasks
have a "❚❚ waiting.." period (3nd row) (might be< 1ms
if queue is empty`) - At some point it is dequeued and "▶ runs.." for a while (2nd row)
Log output (console.table()):
Explain: 1st
task
isenqueue()
d at2.58ms
right after queue initialization. Since our queue is empty there is like no❚❚ waiting
(0.04ms
➜ ~40μm
). Task runtime13.88ms
➜ dequeue
Class Queue
is just a wrapper for native Array
Fn´s!
You can of course implement this in one class. I just want to show, that you can build what you want from already known data structures. There are some good reasons for not using an Array
:
- A
Queue
data-structure is defined by an Interface of two public methods. Using anArray
might tempt others to use nativeArray
methods on it like.reverse()
,.. which would break the definition. enqueue()
anddequeue()
are much more readable thanpush()
andshift()
- If you already have an out-implemented
Queue
class, you can extend from it (re-usable code) - You can replace the item
Array
inclass Queue
by an other data structure: A "Doubly Linked List" which would reduce code complexity forArray.shift()
from O(n) [linear] to O(1) [constant]. (➜ better time complexity than native array Fn!) (➜ final demo)
Code limitations
This AutoQueue
class is not limited to async
functions. It handles anything, that can be called like await item[MyTask](this)
:
let task = queue => {..}
➜ sync functionslet task = async queue => {..}
➜ async functionslet task = queue => new Promise(resolve => setTimeout(resolve, 100)
➜new Promise()
Note: We already call our tasks with
await
, whereawait
wraps the response of the task into aPromise
. Nr 2. (async function), always returns aPromise
on its own, and theawait
call just wraps aPromise
into an otherPromise
, which is slightly less efficient. Nr 3. is fine. Returned Promises will not get wrapped byawait
This is how async functions are executed: (source)
- The result of an async function is always a Promise
p
. That Promise is created when starting the execution of the async function. - The body is executed. Execution may finish permanently via return or throw. Or it may finish temporarily via await; in which case execution will usually continue later on.
- The Promise
p
is returned.
The following code demonstrates how that works:
asyncfunctionasyncFunc() {
console.log('asyncFunc()'); // (A)return'abc';
}
asyncFunc().
then(x =>console.log(`Resolved: ${x}`)); // (B)console.log('main'); // (C)// Output:// asyncFunc()// main// Resolved: abc
You can rely on the following order:
- Line (A): the async function is started synchronously. The async function’s Promise is resolved via return.
- Line (C): execution continues.
- Line (B): Notification of Promise resolution happens asynchronously.
Read more: "Callable values" Read more: "Async functions"
Performance limitations
Since AutoQueue
is limited to handle one task after the other, it might become a bottleneck in our app. Limiting factors are:
- Tasks per time: ➜ Frequency of new
enqueue()
d tasks. - Runtime per task ➜ Blocking time in
dequeue()
until task complete
1. Tasks per time
This is our responsibility! We can get the current size of the queue
at any time: size = queue.size
. Your outer script needs a "fail-over" case for a steadily growing queue (check "Stacked wait
times" section).
You want to avoid a "queue overflow" like this, where average/mean waitTime
increases over time.
+-------+----------------+----------------+----------------+----------------+
| tasks | enqueueMin(ms) | enqueueMax(ms) | runtimeMin(ms) | runtimeMax(ms) |
| 20 | 0 | 200 | 10 | 30 |
+-------+----------------+----------------+----------------+----------------+
- ➜ Task
20/20
waits for195ms
until exec starts - ➜ From the time, our last task was randomly enqueued, it takes another + ~232ms, until all tasks were resolved.
2. Runtime per task
This one is harder to deal with. (Waiting for a fetch()
can not be improved and we need to wait until the HTTP request completed).
Maybe your fetch()
tasks rely on each others response and a long runtime will block the others.
But there are some things we could do:
Maybe we could cache responses ➜ Reduce runtime on next enqueue.
Maybe we
fetch()
from a CDN and have an alternative URI we could use. In this case we can return anew Promise
from ourtask
that will be run before the nexttask
isenqueue()
d. (see "Error handling"):queue.enqueue(queue =>Promise.race(fetch('url1'), fetch('url2')));
Maybe your have some kind of "long polling" or periodic ajax
task
that runs every x seconds thatcan not be cached. Even if you can not reduce the runtime itself, you could record the runtimes which would give you an aprox. estimation of the next run. Maybe can swap long running tasks to other queue instances.
Balanced AutoQueue
What is an "efficient" Queue
? - Your first thought might be something like:
The most efficient
Queue
handles mosttasks
in shortest period of time?
Since we can not improve our task
runtime, can we lower the waiting time? The example is a queue
with zero (~0ms
) waiting time between tasks.
Hint: In order to compare our next examples we need some base stats that will not change:
+-------+----------------+----------------+------------------+------------------+
| count | random fake runtime for tasks | random enqueue() offset for tasks |
+-------+----------------+----------------+------------------+------------------+
| tasks | runtimeMin(ms) | runtimeMax(ms) | msEnqueueMin(ms) | msEnqueueMax(ms) |
| 200 | 10 | 30 | 0 | 4000 |
+-------+----------------+----------------+------------------+------------------+
Avg. task runtime: ⇒ (10ms + 30ms) / 2 = 20ms
Total time: ⇒ 20ms * 200 = 4000ms ≙ 4s
➜ We expect our queue to be resolved after ~4s
➜ For consistent enqueue() frequency we set msEnqueueMax to 4000
AutoQueue
finished lastdequeue()
after~4.12s
(^^ see tooltip).- Which is
~120ms
longer than our expected4s
:Hint: There is a small "Log" block" after each task
~0.3ms
, where I build/push anObject
with log marks to a global 'Array' for theconsole.table()
log at the end. This explains200 * 0.3ms = 60ms
.. The missing60ms
are untracked (you see the small gap between the tasks) ->0.3ms
/task for our test loop and probably some delay from open Dev-Tools,..
We come back to these timings later.
The initialization code for our queue
:
const queue = newAutoQueue();
// .. get 200 random Int numbers for our task "fake" runtimes [10-30]ms
let runtimes = Array.from({ length: 200 }, () => rndInt(10, 30));
let i = 0;
let enqueue = queue => {
if (i >= 200) {
return queue; // break out condition
}
i++;
queue
.enqueue(
newTask({ // generate a "fake" task with of a rand. runtime
ms: runtimes[i - 1],
url: _(i)
})
)
.then(payload => {
enqueue(queue);
});
};
enqueue(queue); // start recurion
We recursively enqueue()
our next task, right after the previous finished. You might have noticed the analogy to a typicalPromise.then()
chain, right?
Hint: We don´t need a
Queue
if we already know the order and total number oftasks
to run in sequence. We can use aPromise
chain and get the same results.
Sometimes we don´t know all next steps right at the start of our script..
..You might need more flexibility, and the next task we want to run depends on the response of the previous task
. - Maybe your app relies on a REST API (multiple endpoints), and you are limited to max X simultaneous API request(s). We can not spam the API with requests from all over your app. You even don´t know when the next request gets enqueue()
d (e.g. API requests are triggered by click()
events?..
Ok, for the next example I changed the initialization code a bit:
We now enqueue 200 tasks randomly within [0-4000ms] period. - To be fair, we reduced the range by 30ms
(max task runtime) to [0-3970ms]. Now our randomly filled queue has a chance to keep inside 4000ms
limit.
What we can get out or the Dev-Tools performance login:
- Random
enqueue()
leads to a big number of "waiting" tasks.Makes sense, since we enqueued all tasks within first
~4000ms
, they must overlap somehow. Checking the table output we can verify: Maxqueue.size
is22
at the time task170/200
was enqueued. - Waiting tasks are not evenly distributed. Right after start there are even some idle section.
Because of the random
enqueue()
it it unlikely to get a0ms
offset for our first task.~20ms
runtime for each task lead to the stacking effect over time. - We can sort tasks by "wait ms" (see screen): Longest waiting time was
>400ms
.There might be a relation between
queue.size
(column:sizeOnAdd
) andwait ms
(see next section). - Our
AwaitQueue
completed lastdequeue()
~4.37s
after its initialization (check tooltip in "performance" tab). An average runtime of20,786ms / task
(expected:20ms
) gives us a total runtime of4157.13ms
(expected:4000ms
≙4s
).We still have our "Log" blocks and the exec. time of our test script it self
~120ms
. Still~37ms
longer? Summing up all idle "gaps" right at the start explains the missing~37ms
Back to our initial "definition"
The most efficient
Queue
handles mosttasks
in shortest period of time?
Assumption: Apart from the random offset, tasks
get enqueue()
d in the previous example, both queues handled the same number of task
s (equal avg. runtime) within the same period of time. Neither the waiting time of an enqueued task
nor the queue.size
affect the total runtime. Both have the same efficiency?
Since a Queue
, by its nature, shrinks our coding possibilities it is best not to use a Queue
if we talk about efficient code (tasks per time).
A queue helps us to straighten tasks in an async environment into a sync pattern. That is exactly what we want. ➜ "Run an unknown sequence of tasks in a row".
If you find yourself asking things like: "If a new task
gets enqueued into an already filled queue, the time we have to wait for our result, is increased by the run times of the others. That´s less efficient!".
Then you are doing it wrong:
- You either enqueue tasks that have no dependency (in some way) on each other (logical oder programmatical dependency) or there is a dependency which would not increase the total runtime of our script. - We have to wait for the others anyway.
Stacked wait
times
We have see a peak wait
time of 461.05ms
for a task before it runs. Wouldn't it be great if we could forecast the wait
time for a task before we decide to enqueue it?
At first we analyse the behavior of our AutoQueue
class over longer times.
(re-post screens)
We can build a chart from from what console.table()
output:
Beside the wait
time of a task
, we can see the random [10-30ms] runtime
and 3 curves, representing the current queue.size
, recorded at the time a task
..
- .. is
enqueued()
- .. starts to run. (
dequeue()
) - .. the task finished (right before the next
dequeue()
)
2 more runs for comparison (similar trend):
- chart run 2: https://i.imgur.com/EjpRyuv.png
- chart run 3: https://i.imgur.com/0jp5ciV.png
Can we find dependencies among each other?
If we could find a relation between any of those recorded chart lines, it might help us to understand how a queue
behaves over time (➜ constantly filled up with new tasks).
Exkurs: What is a relation? We are looking for an equation that projects the
wait ms
curve onto one of the 3queue.size
records. This would proof a direct dependency between both.
For our last run, we changed our start parameters:
Task count:
200
➜1000
(5x)msEnqueueMax:
4000ms
➜20000ms
(5x)+-------+----------------+----------------+------------------+------------------+| count | random fake runtime for tasks | random enqueue() offsetfor tasks |+-------+----------------+----------------+------------------+------------------+| tasks | runtimeMin(ms) | runtimeMax(ms) | msEnqueueMin(ms) | msEnqueueMax(ms) ||1000|10|30|0|20000|+-------+----------------+----------------+------------------+------------------+ Avg. task runtime: ⇒ (10ms +30ms) /2=20ms (like before) Total time: ⇒ 20ms *1000=20000ms ≙ 20s ➜ We expect our queue to be resolved after ~20s ➜ For consistent enqueue() frequency we set msEnqueueMax to20000
(interactive chart: https://datawrapper.dwcdn.net/p4ZYx/2/)
We see the same trend. wait ms
increases over time (nothing new). Since our 3 queue.size
lines at the bottom were drawn into the same chart (Y-axis has ms
scale), they are barely visible. Quick switch to a logarithmic scale for better comparison:
(interactive chart: https://datawrapper.dwcdn.net/lZngg/1/)
The two dotted lines for queue.size [on start]
and queue.size [on end]
pretty much overlap each other and fall down to "0" once our queue gets empty, at the end.
queue.size [on add]
looks very similar to the wait ms
line. That is what we need.
{queue.size [onadd]} * X = {wait ms}
⇔ X = {wait ms} / {queue.size [onadd]}
This alone does not help us at runtime because wait ms
is unknown for a new enqueued task (has not yet been run). So we still have 2 unknown variable: X
and wait ms
. We need an other relation that helps us.
First of all, we print our new ration {wait ms} / {queue.size [on add]}
into the chart (light green), and its mean/average (light green horizontal dashed). This is pretty close to 20ms
(avg. run ms
of our tasks), right?
Switch back to linear
Y-axes and set its "max scale" to 80ms
to get a better view of it. (hint: wait ms
is now beyond the view port)
(interactive chart: https://datawrapper.dwcdn.net/Tknnr/4/)
Back to the random runtimes of our tasks (dot cloud). We still have our "total mean" of 20.72ms
(dark green dashed horizontal). We can also calc the mean of our previous tasks at runtime (e.g. task 370 gets enqueued ➜ What is the current mean runtime for task [1,.., 269] = mean runtime). But we could even be more precise:
The more tasks we enqueue the less impact they have on total "mean runtime". So let´s just calc the "mean runtime" of the last e.g. 50 tasks
. Which leads to a consistent impact of 1/50 per task for the "mean runtime". ➜ Peak runtimes get straighten and the trend (up/down) is taken into account. (dark green horizontal path curve next to the light green from our 1. equation).
Things we can do now:
We can eliminate
X
from our 1st equation (light green). ➜X
can be expressed by the "mean runtimes of previousn
e.g. 50 tasks (dark green). Our new equation just depends on variables, that are known at runtime, right at the point of enqueue:// mean runtime from prev. n tasks: X = {[taskRun[-50], .. , taskRun[-2], taskRun[-1] ] / n } ms // .. replace X in 1st equation: ⇒ {wait ms} = {queue.size [onadd]} * {[runtime[-50], .. , runtime[-2], runtime[-1] ] / n } ms
We can draw a new diagram curve to our chart and check how close it is compared to the recorded
wait ms
(orange)
(interactive chart: https://datawrapper.dwcdn.net/LFp1d/2/)
Conclusion
We can forecast the wait
for a task before it gets enqueued, given the fact the run times of our tasks can be determined somehow. So it works best in situations where you enqueue tasks of the same type/function:
Use case: An AutoQueue
instance is filled up with render tasks for your UI components. Render time might not change chat much (compared to fetch()
). Maybe you render 1000 location marks on a map. Each mark is an instance of a class with a render()
Fn.
Tips
Queues
are used for various tasks. ➜ Implement dedicatedQueue
class variations for different kinds of logic (not mix different logic in one class)- Check all
task
s that might be enqueued to the sameAutoQueue
instance (now or in future), they could be blocked by all the others. - An
AutoQueue
will not improve the runtime, at best it will not get lowered. - Use different
AutoQueue
instances for differentTask
types. - Monitor the size of your
AutoQueue
, particular ..- .. on heavy usage (high frequently of
enqueue()
) - .. on long or unknown
task
runtimes
- .. on heavy usage (high frequently of
- Check your error handling. Since errors inside your
tasks
will justreject
their returned promise on enqueue (promise = queue.enqueue(..)
) and will not stop the dequeue process. You can handle errors..- .. inside your tasks ➜ `try{..} catch(e){ .. }
- .. right after it (before the next) ➜
return new Promise()
- .. "async" ➜
queue.enqueue(..).catch(e => {..})
- .. "global" ➜ error handler inside the
AutoQueue
class
- Depending on the implementation of your
Queue
you might watch thequeue.size
. AnArray
, filled up with 1000 tasks, is less effective than a decentralized data-structure like the "Doubly Linked List" I used in the final code. - Avoid recursion hell. (It is OK to use
tasks
thatenqueue()
others) - But, it is no fun to debug anAutoQueue
wheretasks
are dynamicallyenqueue()
e by others in anasync
environment.. - At first glance a
Queue
might solve a problem (at a certain level of abstraction). However, in most cases it shrinks existing flexibility. It adds an additional "control layer" to our code (which in most cases, is what we want) at the same time, we sign a contract to accept the strict rules of aQueue
. Even if it solves a problem, it might not be the best solution.
Add more features [basic]
Stop "Auto
dequeue()
" onenqueue()
: Since ourAutoQueue
class is generic and not limited to long running HTTP requests(), you canenqueue()
any function that has to run in sequence, even3min
running functions for e.g. "store updates for modules",.. You can not guarantee, that when youenqueue()
100 tasks in a loop, the prev added task is not alreadydequeued()
.You might want to prevent
enqueue()
from callingdequeue()
until all where added.enqueue(action, autoDequeue = true) { // newreturnnewPromise((resolve, reject) => { super.enqueue({ action, resolve, reject }); if (autoDequeue) this.dequeue(); // new }); }
.. and then call
queue.dequeue()
manually at some point.Control methods:
stop
/pause
/start
You can add more control methods. Maybe your app has multiple modules that all try tofetch()
there resources on pageload. AnAutoQueue()
works like aController
. You can monitor how many tasks are "waiting.." and add more controls:classAutoQueueextendsQueue{ constructor() { this._stop = false; // newthis._pause = false; // new } enqueue(action) { .. } async dequeue() { if (this._pendingPromise) returnfalse; if (this._pause ) returnfalse; // newif (this._stop) { // newthis._queue = []; this._stop = false; returnfalse; } let item = super.dequeue(); .. } stop() { // newthis._stop = true; } pause() { // newthis._pause = true; } start() { // newthis._stop = false; this._pause = false; return await this.dequeue(); } }
Forward response: You might want to process the "response/value" of a
task
in the next task. It is not guaranteed that our prev. task has not already finished, at the time we enqueue the 2nd task. Therefore it might be best to store the response of the prev. task inside the class and forward it to the next:this._payload = await item.action(this._payload)
Error handling
Thrown errors inside a task
Fn reject the promise returned by enqueue()
and will not stop the dequeue process. You might want to handle error before next task
starts to run:
queue.enqueue(queue => myTask() ).catch({ .. }); // async error handling
queue.enqueue(queue =>
myTask()
.then(payload=> otherTask(payload)) // .. inner task
.catch(() => { .. }) // sync error handling
);
Since our Queue
is dump, and just await
´s for our task´s to be resolved (item.action(this)
), no one prevents you from returning a new Promise()
from the current running task
Fn. - It will be resolved before the next task gets dequeued.
You can throw new Error()
inside task Fn´s and handle them "outside"/after run:queue.enqueue(..).catch()
.
You can easily add a custom Error handling inside the dequeue()
method that calls this.stop()
to clear "on hold"(enqueued) tasks..
You can even manipulate the queue from inside your task functions. Check: await item.action(this)
invokes with this
and gives access to the Queue
instance. (this is optional). There are use cases where task
Fn´s should not be able to.
Add more features [advanced]
... text limt reached :D
more: https://gist.github.com/exodus4d/6f02ed518c5a5494808366291ff1e206
Read more
- Blog: "Asynchronous Recursion with Callbacks, Promises and Async"
- Book: "Callable values"
- Book: "Async functions"
Solution 2:
You could save previous pending promise, await for it before calling next fetch
.
// fake fetch for demo purposes onlyconstfetch = (url, options) => newPromise(resolve =>setTimeout(resolve, 1000, {url, options}))
// task executorconst addTask = (() => {
let pending = Promise.resolve();
const run = async (url, options) => {
try {
await pending;
} finally {
returnfetch(url, options);
}
}
// update pending promise so that next task could await for itreturn(url, options) => (pending = run(url, options))
})();
addTask('url1', {options: 1}).then(console.log)
addTask('url2', {options: 2}).then(console.log)
addTask('url3', {options: 3}).then(console.log)
Solution 3:
Here's one I made earlier, also available in typescript
functioncreateAsyncQueue(opts = { dedupe: false }) {
const { dedupe } = opts
let queue = []
let running
constpush = task => {
if (dedupe) queue = []
queue.push(task)
if (!running) running = start()
return running.finally(() => {
running = undefined
})
}
const start = async () => {
const res = []
while (queue.length) {
const item = queue.shift()
res.push(awaititem())
}
return res
}
return { push, queue, flush: () => running || Promise.resolve([]) }
}
// ----- tests below 👇constsleep = ms => newPromise(r =>setTimeout(r, ms))
asyncfunctiontest1() {
const myQueue = createAsyncQueue()
myQueue.push(async () => {
console.log(100)
awaitsleep(100)
return100
})
myQueue.push(async () => {
console.log(10)
awaitsleep(10)
return10
})
console.log(await myQueue.flush())
}
asyncfunctiontest2() {
const myQueue = createAsyncQueue({ dedupe: true })
myQueue.push(async () => {
console.log(100)
awaitsleep(100)
return100
})
myQueue.push(async () => {
console.log(10)
awaitsleep(10)
return10
})
myQueue.push(async () => {
console.log(9)
awaitsleep(9)
return9
})
// only 100 and 9 will be executed// concurrent executions will be dedupedconsole.log(await myQueue.flush())
}
test1().then(test2)
Example usage:
const queue = createAsyncQueue()
const task1 = async () => {
await fetchItem()
}
queue.push(task1)
const task2 = async () => {
await fetchItem()
}
queue.push(task2)
// task1 will be guaranteed to be executed before task2
Post a Comment for "Js Async / Await Tasks Queue"