📜  使用Flask,Redis和Celery的异步任务

📅  最后修改于: 2020-09-03 05:05:27             🧑  作者: Mango

介绍

随着Web应用程序的发展和使用的增加,用例也越来越多样化。我们现在正在建设和使用网站来执行比以往任何时候都更复杂的任务。这些任务中的一些可以进行处理,并将反馈立即转发给用户,而其他一些则需要稍后进一步处理和中继结果。越来越多地采用Internet访问和具有Internet功能的设备导致最终用户流量增加。

为了处理增加的流量或增加的功能复杂性,有时我们可能会选择推迟工作并在以后的时间中继结果。这样,我们就无法让用户在我们的Web应用程序上等待未知的时间,而是在以后的时间发送结果。我们可以通过在流量较低的情况下利用后台任务来处理工作或分批处理工作来实现这一目标。

我们可以使用的解决方案之一是Celery。它可以帮助我们分解复杂的工作,并让它们由不同的机器执行,以减轻一台机器上的负载或减少完成时间。

在本文中,我们将探讨Celery在Flask应用程序中安排后台任务的使用,以减轻资源密集型任务的负担并确定对最终用户的响应的优先级。

什么是任务队列?

任务队列是一种分配小的工作单元或任务的机制,可以在不干扰大多数基于Web的应用程序的请求-响应周期的情况下执行这些任务。

任务队列有助于委派工作,否则将在等待响应时降低应用程序的速度。它们还可以用于在主机或进程与用户交互时处理资源密集型任务。

这样,与用户的交互是一致的,及时的,并且不受工作量的影响。

什么是芹菜?

Celery是一个异步任务队列,它基于传递的分布式消息在计算机或线程之间分配工作负载。芹菜系统由一个客户,一个经纪人和几个工人组成。

这些工作人员负责执行队列中放置的任务或工作并转发结果。使用Celery,您可以同时拥有本地和远程工作人员,这意味着可以通过Internet将工作委派给功能更强大的其他计算机,并将结果中继回客户端。

这样,可以减轻主机上的负载,并且可以使用更多资源来处理用户请求。

Celery设置中的客户端负责向工作人员发布作业,并使用消息代理与他们进行通信。代理通过消息队列促进客户端与Celery安装中的工作人员之间的通信,消息队列中将消息添加到队列中,然后代理将消息传递给客户端。

此类消息代理的示例包括RedisRabbitMQ

为什么要使用芹菜?

为什么要让Celery进行后台任务有多种原因。首先,它具有很好的可扩展性,允许按需添加更多的工作人员,以适应增加的负载或流量。Celery仍在积极开发中,这意味着它是一个受支持的项目,同时其简洁的文档和活跃的用户社区也是如此。

另一个优势是Celery易于集成到多个Web框架中,其中大多数都具有促进集成的库。

它还提供了通过不存在支持交互的库的webhooks与其他Web应用程序交互的功能。

Celery还可以使用各种消息代理,这为我们提供了灵活性。建议使用RabbitMQ,但它也可以支持Redis和Beanstalk

演示申请

我们将构建一个Flask应用程序,该应用程序允许用户设置提醒,该提醒将在设定的时间传递到他们的电子邮件中。

我们还将提供自定义消息或提醒被调用并将消息发送给用户之前的时间量的功能。

建立

与其他项目一样,我们的工作将在虚拟环境中进行,我们将使用Pipenv工具创建和管理该虚拟环境: 

$ pipenv install --three
$ pipenv shell

对于此项目,我们将需要安装Flask和Celery软件包以开始: 

$ pipenv install flask celery

这是我们的Flask应用程序文件结构的样子: 

.
├── Pipfile                    # manage our environment
├── Pipfile.lock
├── README.md
├── __init__.py
├── app.py                     # main Flask application implementation
├── config.py                  # to host the configuration
├── requirements.txt           # store our requirements
└── templates
    └── index.html             # the landing page

1 directory, 8 files

对于基于Celery的项目,我们将Redis用作消息代理,我们可以在其主页上找到设置它的说明。

实作

让我们从创建Flask应用程序开始,该应用程序将呈现一个表单,该表单允许用户输入将来发送的消息的详细信息。

我们将以下内容添加到我们的app.py文件中: 

from flask import Flask, flash, render_template, request, redirect, url_for

app = Flask(__name__)
app.config.from_object("config")
app.secret_key = app.config['SECRET_KEY']

@app.route('/', methods=['GET', 'POST'])
def index():
    if request.method == 'GET':
        return render_template('index.html')

    elif request.method == 'POST':
        email = request.form['email']
        first_name = request.form['first_name']
        last_name = request.form['last_name']
        message = request.form['message']
        duration = request.form['duration']
        duration_unit = request.form['duration_unit']

        flash(“Message scheduled")
        return redirect(url_for('index'))


if __name__ == '__main__':
    app.run(debug=True)

这是一个非常简单的应用程序,只有一条路由即可处理GETPOST请求表单。提交详细信息后,我们可以将数据移交给可以安排作业的功能。

为了整理主应用程序文件,我们将配置变量放在单独的config.py文件中,并从文件中加载配置:

app.config.from_object("config")

我们的config.py文件将与文件位于同一文件夹中,app.py并包含一些基本配置:

SECRET_KEY = 'very_very_secure_and_secret'
# more config

现在,让我们将目标网页实现为index.html

{% for message in get_flashed_messages() %}
  

{{ message }}

{% endfor %}
First Name: Last Name: Email: Message: Duration:

为了简洁起见,样式和格式已被截断,请随意设置HTML格式/样式。

现在,我们可以启动我们的应用程序:

使用Flask邮件发送电子邮件

为了从Flask应用程序发送电子邮件,我们将使用Flask-Mail库,该库将添加到我们的项目中,如下所示: 

$ pipenv install flask-mail

有了Flask应用程序和表单,我们现在可以将Flask-Mail集成到我们的app.py

from flask_mail import Mail, Message

app = Flask(__name__)
app.config.from_object("config")
app.secret_key = app.config['SECRET_KEY']

# set up Flask-Mail Integration
mail = Mail(app)

def send_mail(data):
    """ Function to send emails.
    """
    with app.app_context():
        msg = Message("Ping!",
                    sender="admin.ping",
                    recipients=[data['email']])
        msg.body = data['message']
        mail.send(msg)

该函数send_main(data)将接收要发送的消息和电子邮件的收件人,然后在经过指定的时间后将其调用以将电子邮件发送给用户。

config.py为了使Flask-Mail正常运行,我们还需要向我们添加以下变量:

# Flask-Mail
MAIL_SERVER = 'smtp.googlemail.com'
MAIL_PORT = 587
MAIL_USE_TLS = True
MAIL_USERNAME = 'mail-username'
MAIL_PASSWORD = 'mail-password'

Celery整合

在我们的Flask应用程序准备就绪并具备电子邮件发送功能之后,我们现在可以集成Celery,以便安排以后发送电子邮件。

我们app.py将再次被修改:

# Existing imports are maintained
from celery import Celery

# Flask app and flask-mail configuration truncated

# Set up celery client
client = Celery(app.name, broker=app.config['CELERY_BROKER_URL'])
client.conf.update(app.config)

# Add this decorator to our send_mail function
@client.task
def send_mail(data):
    # Function remains the same

@app.route('/', methods=['GET', 'POST'])
def index():
    if request.method == 'GET':
        return render_template('index.html')

    elif request.method == 'POST':
        data = {}
        data['email'] = request.form['email']
        data['first_name'] = request.form['first_name']
        data['last_name'] = request.form['last_name']
        data['message'] = request.form['message']
        duration = int(request.form['duration'])
        duration_unit = request.form['duration_unit']

        if duration_unit == 'minutes':
            duration *= 60
        elif duration_unit == 'hours':
            duration *= 3600
        elif duration_unit == 'days':
            duration *= 86400

        send_mail.apply_async(args=[data], countdown=duration)
        flash(f"Email will be sent to {data['email']} in {request.form['duration']} {duration_unit}")

        return redirect(url_for('index'))

celery通过附加消息传递代理的URL,我们导入并使用它在Flask应用程序中初始化Celery客户端。在我们的例子中,我们将使用Redis作为代理,因此我们将以下内容添加到我们的config.py

CELERY_BROKER_URL = 'redis://localhost:6379/0'
CELERY_RESULT_BACKEND = 'redis://localhost:6379/0'

为了使我们的send_mail()功能作为后台任务执行,我们将添加@client.task装饰器,以便我们的Celery客户端会意识到这一点。

设置Celery客户端后,将修改还可以处理表单输入的主要功能。

首先,我们将send_mail()函数的输入数据打包在字典中。然后,我们使用函数通过Celery Task Calling API调用我们的邮件函数,该函数apply_async接受函数所需的参数。

设置了一个可选countdown参数,定义了运行代码和执行任务之间的延迟。

该持续时间以秒为单位,这就是我们根据用户选择的时间单位将用户传递的持续时间转换为秒的原因。

用户提交表单后,我们将确认收到的邮件,并在邮件发送出去时通过标语消息通知他们。

 

汇集一切

为了运行我们的项目,我们将需要两个终端,一个终端启动我们的Flask应用程序,另一个终端启动Celery worker,后者将在后台发送消息。

在第一个终端中启动Flask应用程序:

$ python app.py

在第二个终端中,启动虚拟环境,然后启动Celery worker:

# start the virtualenv
$ pipenv shell
$ celery worker -A app.client --loglevel=info

如果一切顺利,我们将在运行Celery客户端的终端中获得以下反馈:

现在,让我们导航http://localhost:5000并填写详细信息,以计划在提交2分钟后到达的电子邮件。

在表格上方,将显示一条消息,指示将接收电子邮件的地址以及发送电子邮件的持续时间。在我们的Celery终端中,我们还将能够看到一个日志条目,表明我们的电子邮件已被调度:

[2019-10-23 16:27:25,399: INFO/MainProcess] Received task: app.send_mail[d65025c8-a291-40d0-aea2-e816cb40cd78]  ETA:[2019-10-23 13:29:25.170622+00:00]

ETA条目的部分显示何时send_email()调用我们的函数,以及何时发送电子邮件。

到目前为止,一切都很好。我们的电子邮件正在计划中,并在指定的时间发送出去,但是,缺少一件事。在执行任务之前或之后,我们无法看到任务,也无法得知电子邮件是否已实际发送。

因此,让我们为后台任务实现一个监视解决方案,以便我们可以查看任务,并注意出现问题以及未按计划执行任务的情况。

 

使用Flower监控Delery集群

Flower是一个基于Web的工具,它将提供我们Celery设置的可见性,并提供查看任务进度,历史记录,详细信息和统计信息(包括成功率或失败率)的功能。我们还可以监视集群中的所有工作人员及其当前正在处理的任务。

安装Flower非常简单:

$ pipenv install flower

之前,我们在app.py文件中指定了Celery客户的详细信息。我们需要将该客户传递给Flower以便对其进行监控。

为此,我们需要打开第三个终端窗口,进入虚拟环境,然后启动监视工具:

$ pipenv shell
$ flower -A app.client --port=5555

启动Flower时,我们通过将Celery客户端传递给application(-A)参数来指定它,并通过该--port参数指定要使用的端口。

有了我们的监控功能后,让我们计划在仪表板上发送另一封电子邮件,然后导航到http://localhost:5555,在以下位置我们会对此表示欢迎:

在此页面上,我们可以看到Celery集群中的工作人员列表,该列表当前仅由我们的机器组成。

要查看我们刚刚计划的电子邮件,请单击仪表板左上方的“ 任务”按钮,这将带我们到可以查看已计划的任务的页面:

在本部分中,我们可以看到我们已计划了两封电子邮件,并且已在计划的时间成功发送了一封电子邮件。出于测试目的,计划分别在1分钟和5分钟后发送电子邮件。

我们还可以从本节中看到收到文本的时间和执行时间。

在“监视器”部分中,有一些图形显示了后台任务的成功率和失败率。

我们可以根据需要安排消息的时间,但这也意味着我们的工作人员必须在线并且可以在执行任务时正常工作。

结论

我们已经成功建立了Celery集群并将其集成到我们的Flask应用程序中,该应用程序允许用户计划在将来的某个时间后发送电子邮件。

电子邮件发送功能已委派给后台任务,并放置在队列中,该队列将由我们本地Celery集群中的工作人员选择并执行。

与往常一样,该项目的源代码可在Github上获得