📜  使用Flask,Redis和Celery的异步任务(1)

📅  最后修改于: 2023-12-03 15:36:35.541000             🧑  作者: Mango

使用Flask,Redis和Celery的异步任务

简介

Flask是一个轻量级的Python web框架,广泛应用于小型应用的开发。Redis是一个开源的缓存和消息队列,是一个高性能的NoSQL数据库。Celery是一个Python开发的分布式任务队列,旨在实现高效异步任务的处理。

在本篇文章中,我们将结合Flask、Redis和Celery,演示如何使用它们来实现一个异步任务的处理。

安装依赖

我们需要安装以下依赖包:

  • flask
  • redis
  • celery

可以使用pip来安装这些依赖:

pip install flask redis celery
创建Flask应用

接下来,我们来创建一个基本的Flask应用,并建立一个简单的API接口。

from flask import Flask

app = Flask(__name__)

@app.route('/process')
def process():
    return 'Processing...'

现在,我们创建了一个Flask应用,并建立了一个名为process的API接口。

设置Celery

接下来,我们需要设置Celery,以便能够使用它来处理我们的异步任务。

from celery import Celery

# 创建Celery实例
celery = Celery('tasks', broker='redis://localhost:6379/0')

@celery.task
def process_task():
    return 'Task processed!'

我们首先导入了Celery模块,并创建了一个Celery实例。通过设置broker参数,我们指定了使用Redis作为任务队列。

然后,我们定义了一个名为process_task的任务。为了使其成为异步任务,我们使用了@celery.task装饰器。

创建API接口

接下来,我们需要建立一个API接口,该接口将调用我们的异步任务。

from flask import jsonify

@app.route('/start')
def start():
    task = process_task.delay()
    return jsonify({'task_id': task.id})

/start接口中,我们首先调用了process_task.delay()方法,以开启一个异步任务。然后,我们将任务的ID返回给客户端。

创建结果查询API接口

在任务完成后,客户端可能会想要查询任务的结果。因此,我们需要建立一个API接口,该接口接收任务ID,并返回任务的结果。

@app.route('/result/<task_id>')
def result(task_id):
    task = process_task.AsyncResult(task_id)
    if task.ready():
        return task.get()
    else:
        return 'Task not ready...'

这个接口首先使用任务ID来查询异步任务的状态。如果任务已完成,则返回其结果。否则,返回一个错误信息。

总结

到目前为止,我们已经演示了如何在Flask应用中使用Redis和Celery来处理异步任务。对于需要处理长时间运行的任务的应用程序,这是一个非常有用的功能。

from flask import Flask, jsonify
from celery import Celery

app = Flask(__name__)
celery = Celery('tasks', broker='redis://localhost:6379/0')

@celery.task
def process_task():
    return 'Task processed!'

@app.route('/process')
def process():
    return 'Processing...'

@app.route('/start')
def start():
    task = process_task.delay()
    return jsonify({'task_id': task.id})

@app.route('/result/<task_id>')
def result(task_id):
    task = process_task.AsyncResult(task_id)
    if task.ready():
        return task.get()
    else:
        return 'Task not ready...'

if __name__ == '__main__':
    app.run()