📜  Node.js async.queue() 方法

📅  最后修改于: 2022-05-13 01:56:35.012000             🧑  作者: Mango

Node.js async.queue() 方法

async 模块是为在 NodeJS 中使用异步JavaScript 而设计的。 async.queue 返回一个能够并发处理队列对象,即一次处理多个项目。

如何使用 async.queue?

  • 第 1 步:创建一个 package.json 文件。运行以下命令会创建一个 package.json 文件。
npm init
  • 第 2 步:安装异步模块。可以使用以下命令安装异步模块。
npm i async
  • 第 3 步:导入异步模块。可以使用以下命令导入异步模块。
const async = require('async')
  • 第 4 步:使用 async.queue 模块。 async.queue 的语法。
const queue = async.queue('function', 'concurrency value')

参数函数在添加到队列的元素上执行。并发值告诉队列,在特定时间要处理的元素数。

示例:请查看以下示例以更好地理解。

Javascript
// Defining The queue
const queue = async.queue((task, completed) => {
    // Here task is the current element being
    // processed and completed is the callback function
     
    console.log("Currently Busy Processing Task " + task);
     
    // Simulating a complex process.
    setTimeout(()=>{
        // Number of elements to be processed.
        const remaining = queue.length();
        completed(null, {task, remaining});
    }, 1000);
 
}, 1);
 
// The concurrency value is set to one,
// Which means that one element is being
// Processed at a particular time


Javascript
// Takes in two parameters, the item being pushed and
// the callback function
// Here the item is the element being added
// and other is the callback function with error
// and data property(destructured)
 
queue.push(item, (error, {item, remaining}) => {
  if(error){
      console.log(`An error occurred while processing task ${task}`);
  } else {
      console.log(`Finished processing task ${task}
             . ${remaining} tasks remaining`);
  }
});


Javascript
console.log(queue.length());


Javascript
// Returns true if the queue has started processing the data else false
console.log(queue.started)


Javascript
// Takes in two parameters, the item being pushed
// and the callback function
// Here the item is the element being added
// and other is the callback function with error
// and data property(destructured)
 
queue.unshift(item, (error, {item, remaining}) => {
  if(error){
   console.log(`An error occurred while processing task ${task}`);
  } else {
   console.log(`Finished processing task ${task}. ${remaining} tasks remaining`);
  }
});


Javascript
// Executes when the queue is done processing all the items
queue.drain(() => {
    console.log('Successfully processed all items');
})


Javascript
// Pauses the execution of the queue
queue.pause()


Javascript
// Resumes the execution of the queue
queue.resume()


Javascript
// Forces the queue into idle mode
// and removes the drain callback
queue.kill()


Javascript
// Returns whether the queue is idle or not
queue.idle()


Javascript
// Importing the async module
const async = require('async');
 
// Creating a tasks array
const tasks = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10];
 
// Defining the queue
const queue = async.queue((task, completed) => {
    console.log("Currently Busy Processing Task " + task);
     
    // Simulating a Complex task
    setTimeout(()=>{
        // The number of tasks to be processed
        const remaining = queue.length();
        completed(null, {task, remaining});
    }, 1000);
 
}, 1); // The concurrency value is 1
 
 
// The queue is idle as there are no elements
// for the queue to process
console.log(`Did the queue start ? ${queue.started}`)
 
// Adding the each task to the queue
tasks.forEach((task)=>{
 
    // Adding the 5th task to the head of the
    // queue as it is deemed important by us
 if(task == 5){
    queue.unshift(task, (error, {task, remaining})=>{
      if(error){
       console.log(`An error occurred while processing task ${task}`);
      }else {
       console.log(`Finished processing task ${task}. ${remaining} tasks remaining`);
      }
    })     
        // Adding the task to the tail in the order of their appearance
 } else {
      queue.push(task, (error, {task, remaining})=>{
       if(error){
        console.log(`An error occurred while processing task ${task}`);
       }else {
        console.log(`Finished processing task ${task}. ${remaining} tasks remaining`);
      }
      })
    }
});
 
 
// Executes the callback when the queue is done processing all the tasks
queue.drain(() => {
    console.log('Successfully processed all items');
})
 
// The queue is not idle it is processing the tasks asynchronously
console.log(`Did the queue start ? ${queue.started}`)


async.queue 中的重要方法和属性:

push(element, callback) : push 方法用于将元素添加到队列的尾部。以下代码演示了 push 方法的工作原理。

Javascript

// Takes in two parameters, the item being pushed and
// the callback function
// Here the item is the element being added
// and other is the callback function with error
// and data property(destructured)
 
queue.push(item, (error, {item, remaining}) => {
  if(error){
      console.log(`An error occurred while processing task ${task}`);
  } else {
      console.log(`Finished processing task ${task}
             . ${remaining} tasks remaining`);
  }
});

length(): length 方法返回队列中当前存在的元素数。以下代码演示了长度方法的工作原理。

Javascript

console.log(queue.length());

started 属性: started 属性返回一个布尔值,表示队列是否已经开始处理数据。以下代码演示了 started 属性的工作原理。

Javascript

// Returns true if the queue has started processing the data else false
console.log(queue.started)

unshift(element, callback) : unshift方法和push方法类似,但是元素被添加到队列的头部,表示要处理的元素是重要的。以下代码演示了 unshift 方法的工作原理:-

Javascript

// Takes in two parameters, the item being pushed
// and the callback function
// Here the item is the element being added
// and other is the callback function with error
// and data property(destructured)
 
queue.unshift(item, (error, {item, remaining}) => {
  if(error){
   console.log(`An error occurred while processing task ${task}`);
  } else {
   console.log(`Finished processing task ${task}. ${remaining} tasks remaining`);
  }
});

drain() 方法:当队列执行完所有任务时,drain 方法运行一个回调函数。下面的代码演示了 drain 方法的工作原理。

注意: drain 方法仅在所描述的函数是箭头函数时才有效。

Javascript

// Executes when the queue is done processing all the items
queue.drain(() => {
    console.log('Successfully processed all items');
})

pause() 方法: pause 方法暂停队列中元素的执行,直到调用 resume函数。以下代码演示了 pause 方法的工作原理。

Javascript

// Pauses the execution of the queue
queue.pause()

resume() 方法: resume 方法恢复队列中元素的执行。以下代码演示了 resume 方法的工作原理。

Javascript

// Resumes the execution of the queue
queue.resume()

kill() 方法: kill 方法从队列中移除所有元素,drain 方法的回调函数并强制其进入空闲状态。下面的代码演示了 kill 方法的工作原理。

Javascript

// Forces the queue into idle mode
// and removes the drain callback
queue.kill()

idle() 方法: idle() 方法返回一个布尔值,指示队列是空闲还是正在处理某事。下面的代码演示了 idle 方法的工作原理。

Javascript

// Returns whether the queue is idle or not
queue.idle()

完整代码:以下代码完整演示了 async.queue 的实际使用方式。

Javascript

// Importing the async module
const async = require('async');
 
// Creating a tasks array
const tasks = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10];
 
// Defining the queue
const queue = async.queue((task, completed) => {
    console.log("Currently Busy Processing Task " + task);
     
    // Simulating a Complex task
    setTimeout(()=>{
        // The number of tasks to be processed
        const remaining = queue.length();
        completed(null, {task, remaining});
    }, 1000);
 
}, 1); // The concurrency value is 1
 
 
// The queue is idle as there are no elements
// for the queue to process
console.log(`Did the queue start ? ${queue.started}`)
 
// Adding the each task to the queue
tasks.forEach((task)=>{
 
    // Adding the 5th task to the head of the
    // queue as it is deemed important by us
 if(task == 5){
    queue.unshift(task, (error, {task, remaining})=>{
      if(error){
       console.log(`An error occurred while processing task ${task}`);
      }else {
       console.log(`Finished processing task ${task}. ${remaining} tasks remaining`);
      }
    })     
        // Adding the task to the tail in the order of their appearance
 } else {
      queue.push(task, (error, {task, remaining})=>{
       if(error){
        console.log(`An error occurred while processing task ${task}`);
       }else {
        console.log(`Finished processing task ${task}. ${remaining} tasks remaining`);
      }
      })
    }
});
 
 
// Executes the callback when the queue is done processing all the tasks
queue.drain(() => {
    console.log('Successfully processed all items');
})
 
// The queue is not idle it is processing the tasks asynchronously
console.log(`Did the queue start ? ${queue.started}`)

输出: