Docs 菜单
Docs 主页
/ /

集成 Flask 和 Celery

在本教程中,您可以学习;了解如何使用MongoDB、Flask 和 Celery构建新闻简报平台。该应用程序允许用户订阅新闻通讯,并允许管理员异步管理和批处理发送电子邮件。

Flask 是一个轻量级 Web应用程序框架,具有内置配置和约定默认值,可为跨项目的开发人员提供一致性。有关更多信息,请参阅 Flask 网页。

Celery 是一个开源分布式任务队列,可高效处理大量消息。它支持异步处理和任务调度。有关更多信息,请参阅Celery 网页。

本教程使用JavaScript、Flask 和MongoDB示例项目 Github存储库在 Newsletter Platform 中重新创建了示例应用程序。

在开始本教程之前,请确保已安装并设立以下组件:

  • 一个MongoDB 集群。我们建议您使用Atlas。要学习;了解如何创建Atlas 集群,请参阅Atlas文档中的Atlas入门页面。

  • 集群中名为 newsletter 的数据库。有关更多信息,请参阅Atlas指南中的创建数据库页面。

  • RabbitMQ 用作 Celery 的消息代理。

  • Gmail 用作 SMTP服务器。有关 SMTP 服务器的更多信息,请参阅简单邮件传输协议维基百科页面。

  • Python 3.9 或更高版本

1

项目目录的名称是 newsletter。创建目录并通过在终端中运行以下命令导航到该目录:

mkdir newsletter
cd newsletter

以下文件将保存应用程序的代码:

  • app.py:Flask应用程序的主入口点

  • config.py:应用程序的配置设置,包括MongoDB连接 URI、邮件服务器配置、Celery 代理连接和任何其他特定于环境的变量

  • tasks.py:定义背景任务以异步发送电子邮件

  • routes.py:定义应用程序响应的路由 (URL)

我们建议在构建应用程序时划分关注点,这样可以使应用程序模块化且更易于维护。

在项目目录中,创建以下结构:

newsletter/
├── app.py
├── config.py
├── routes.py
├── tasks.py
├── templates/
│ ├── admin.html
│ └── subscribe.html
└── static/
└── styles.css
2

您的应用程序使用以下库:

  • 用于处理 Web服务器和路由的 Flask

  • Flask-Mail,用于从应用程序发送电子邮件

  • pymongo

  • 用于管理任务的 Celery,例如批处理发送电子邮件

提示

使用虚拟环境

Python虚拟环境支持为不同项目安装不同版本的库。在运行任何 pip 命令之前,请确保 virtualenv 处于活动状态。

在终端中运行以下 pip 命令以安装依赖项:

pip install flask-pymongo Flask-Mail celery

config.py文件包含用于执行以下操作的设置和凭证:

  • 将 Celery 连接到 RabbitMQ 作为其消息代理

  • 配置 Flask-Mail 以使用 Gmail 作为其 SMTP服务器

  • 将应用程序连接到MongoDB 部署

通过将以下代码添加到 config.py文件来定义必要的配置:

import os
class Config:
MAIL_USERNAME = '<username>' # Your email address without '@gmail.com'
MAIL_PASSWORD = '<app password>'
MAIL_DEFAULT_SENDER = '<email address>'
MONGO_URI = '<mongodb connection string>'
DATABASE_NAME = "newsletter"
ALLOWED_IPS = ['127.0.0.1']
MAIL_SERVER = 'smtp.gmail.com'
MAIL_PORT = 587
MAIL_USE_TLS = True
CELERY_BROKER_URL = 'amqp://guest:guest@localhost//'
RESULT_BACKEND = MONGO_URI + '/celery_results'

您必须提供 Gmail凭证和电子邮件(MAIL_USERNAMEMAIL_PASSWORDMAIL_DEFAULT_SENDER),应用程序启用发送电子邮件。出于安全考虑,我们建议您生成要使用的应用密码,而不是使用主节点 (primary node in the replica set)密码。有关更多信息,请参阅 Google 帐户中的应用程序密码设置

您还必须提供一个连接字符串,将其设立为 MONGO_URI 环境变量。 有关更多信息,请参阅本指南的“创建连接字符串”部分。

提供的 Celery 代理URL (CELERY_BROKER_URL) 将 RabbitMQ 指定为其代理,但您可以自定义此URL以支持其他实施。有关更多信息,请参阅 Celery 文档的代理设置部分。

ALLOWED_IPS 列表用于控制对 Send Newsletter 页面的访问权限。其余变量配置 Flask 和 Celery 组件。

app.py文件初始化并配置应用程序的核心组件。它执行以下任务:

  • 创建 Flask应用程序并加载配置常量

  • 使用应用程序的邮件服务器设置初始化 Flask-Mail实例

  • 使用PyMongo驾驶员连接到 newsletter MongoDB 数据库

  • 创建一个配置有 Flask应用和所选代理的 Celery实例

通过将以下代码添加到 app.py文件来初始化 Flask、 MongoDB和 Celery:

from flask import Flask
from flask_mail import Mail
from flask_pymongo import PyMongo
from celery import Celery
# Create a Flask application
app = Flask(__name__)
app.config.from_object('config.Config')
# Create a Flask-Mail instance
mail = Mail(app)
# Connect to MongoDB
client = PyMongo(app).cx
db = client[app.config['DATABASE_NAME']]
# Create a Celery instance
celery = Celery(app.name, broker=app.config['CELERY_BROKER_URL'])
celery.conf.update(app.config)
from routes import *
from tasks import *
if __name__ == '__main__':
app.run(debug=True)

Celery任务使用 app.py文件中实例化的组件向订阅者发送新闻简报电子邮件。

@celery.task() 装饰器将该函数注册为 Celery任务。设置 bind=True 意味着该函数接收任务实例作为 self 参数,这允许它访问权限Celery任务方法和元数据。有关任务的更多信息,请参阅 celery。应用。任务API文档。

由于此任务在 Flask 的HTTP请求周期之外运行,因此您必须通过将电子邮件逻辑包装在 with app.app_context()区块中来手动提供应用程序程序上下文。这使 Flask访问权限其他组件,例如 Flask-Mail mail实例以及PyMongo与 newsletter MongoDB 数据库的连接。

此函数循环遍历 subscribers 列表,使用 Flask-Mail Message 类创建电子邮件,然后使用 mail对象将其发送给每个用户。每封电子邮件发送后,它都会在MongoDB deliveries集合中插入一个文档,以记录邮件的发送情况,从而记录发送情况。每个电子邮件操作都包装在 try区块中,以确保在出现错误时记录故障,并且不会使用错误的传递记录更新数据库。

通过将以下代码添加到 tasks.py文件来定义 send_emails() 函数:

from flask_mail import Message
from app import app, mail, db, celery
from datetime import datetime
@celery.task(bind=True)
def send_emails(self, subscribers, title, body):
with app.app_context():
for subscriber in subscribers:
try:
print(f"Sending email to {subscriber['email']}")
msg = Message(title, recipients=[subscriber['email']])
msg.body = body
mail.send(msg)
db.deliveries.insert_one({
'email': subscriber['email'],
'title': title,
'body': body,
'delivered_at': datetime.utcnow()
})
print("Email sent")
except Exception as e:
print(f"Failed to send email to {subscriber['email']}: {str(e)}")
return {'result': 'All emails sent'}

在 Flask 中,@app.route() 装饰器为特定函数分配URL路径。在以下代码中,它用于定义根 (/)、/admin/subscribe/send-newsletters 路由。在某些情况下,可选的 methods 参数用于定义允许的HTTP方法列表。

@app.before_request() 装饰器会设置一个在每个请求之前运行的函数。在本例中,该函数将对 admin 页面的访问权限限制为 config.py文件中定义的 ALLOWED_IPS 参数所列出的IP地址,从而提供了一些基本的安全性。具体来说,只允许 localhost访问权限。

根和 /admin 使用 render_template() 方法路由呈现页面。/subscribe/send-newslettersrequest.form[] 中路由访问权限请求参数以执行命令,然后返回HTTP响应。

通过将以下代码添加到 routes.py文件来定义路由:

from flask import render_template, request, abort, jsonify
from app import app, db
from tasks import send_emails
@app.before_request
def limit_remote_addr():
if 'X-Forwarded-For' in request.headers:
remote_addr = request.headers['X-Forwarded-For'].split(',')[0]
else:
remote_addr = request.remote_addr
if request.endpoint == 'admin' and remote_addr not in app.config['ALLOWED_IPS']:
abort(403)
@app.route('/')
def home():
return render_template('subscribe.html')
@app.route('/admin')
def admin():
return render_template('admin.html')
@app.route('/subscribe', methods=['POST'])
def subscribe():
first_name = request.form['firstname']
last_name = request.form['lastname']
email = request.form['email']
if db.users.find_one({'email': email}):
return """
<div class="response error">
<span class="icon">&#x2716;</span> This email is already subscribed!
</div>
""", 409
db.users.insert_one({'firstname': first_name, 'lastname': last_name, 'email': email, 'subscribed': True})
return """
<div class="response success">
<span class="icon">&#x2714;</span> Subscribed successfully!
</div>
""", 200
@app.route('/send-newsletters', methods=['POST'])
def send_newsletters():
title = request.form['title']
body = request.form['body']
subscribers = list(db.users.find({'subscribed': True}))
for subscriber in subscribers:
subscriber['_id'] = str(subscriber['_id'])
send_emails.apply_async(args=[subscribers, title, body])
return jsonify({'message': 'Emails are being sent!'}), 202

您可以在此文件中为应用程序添加更多安全保护或自定义面向用户的警报。

templates目录中的 HTML 文件定义用户界面,并使用标准 HTML 编写。由于此应用程序使用异步HTTP请求,因此这些文件中的脚本使用 Fetch API调用。这些脚本还处理超时和错误。

将以下代码复制到 subscribe.html文件以创建 Subscribe to Newsletter 页面。

<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<title>Subscribe to Newsletter</title>
<link rel="stylesheet" href="{{ url_for('static', filename='styles.css') }}">
</head>
<body>
<h1>Subscribe to our Newsletter</h1>
<form id="subscribe-form">
<label for="firstname">First Name:</label>
<input type="text" id="firstname" name="firstname" required>
<br>
<label for="lastname">Last Name:</label>
<input type="text" id="lastname" name="lastname" required>
<br>
<label for="email">Email:</label>
<input type="email" id="email" name="email" required>
<br>
<button type="submit">Subscribe</button>
</form>
<div id="response"></div>
<script>
document.getElementById('subscribe-form').addEventListener('submit', function(event) {
event.preventDefault();
var formData = new FormData(event.target);
fetch('/subscribe', {
method: 'POST',
body: formData
}).then(response => {
if (!response.ok) {
throw response;
}
return response.text();
}).then(data => {
document.getElementById('response').innerHTML = data;
document.getElementById('subscribe-form').reset();
setTimeout(() => {
document.getElementById('response').innerHTML = '';
}, 3000);
}).catch(error => {
error.text().then(errorMessage => {
document.getElementById('response').innerHTML = errorMessage;
setTimeout(() => {
document.getElementById('response').innerHTML = '';
}, 3000);
});
});
});
</script>
</body>
</html>

管理页面脚本向用户显示警报,指示 send_newsletter 调用成功。

将以下代码复制到 admin.html文件以创建 Send Newsletter 页面:

<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<title>Admin - Send Newsletter</title>
<link rel="stylesheet" href="{{ url_for('static', filename='styles.css') }}">
</head>
<body>
<h1>Send Newsletter</h1>
<form id="admin-form">
<label for="title">Title:</label>
<input type="text" id="title" name="title" required>
<br>
<label for="body">Body:</label>
<textarea id="body" name="body" required></textarea>
<br>
<button type="submit">Send</button>
</form>
<div id="response"></div>
<script>
document.getElementById('admin-form').addEventListener('submit', function(event) {
event.preventDefault();
var formData = new FormData(event.target);
fetch('/send-newsletters', {
method: 'POST',
body: formData
})
.then(response => response.json())
.then(() => {
document.getElementById('response').innerText = 'Emails are being sent!';
setTimeout(() => {
document.getElementById('response').innerText = '';
}, 3000);
document.getElementById('admin-form').reset();
})
.catch(error => {
document.getElementById('response').innerText = 'Error sending emails.';
setTimeout(() => {
document.getElementById('response').innerText = '';
}, 3000);
console.error('Error:', error);
});
});
</script>
</body>
</html>

您可以通过将以下代码添加到 styles.css文件来将样式表应用模板:

body {
font-family: system-ui;
font-optical-sizing: auto;
font-weight: 300;
font-style: normal;
margin: 0;
padding: 0;
display: flex;
flex-direction: column;
align-items: center;
justify-content: center;
height: 100vh;
background-color: #040100;
}
h1 {
color: white;
}
form {
background: #023430;
padding: 30px 40px;
border-radius: 8px;
box-shadow: 0 0 10px rgba(0, 0, 0, 0.1);
width: 100%;
max-width: 400px;
margin: 20px 0;
}
label {
display: block;
margin-bottom: 8px;
font-weight: bold;
color: white;
}
input[type="text"],
input[type="email"],
textarea {
width: 100%;
padding: 10px;
margin-bottom: 10px;
border: 1px solid #ccc;
border-radius: 4px;
font-size: 16px;
}
button {
background: #00ED64;
color: white;
padding: 10px 20px;
border: none;
border-radius: 4px;
cursor: pointer;
font-size: 16px;
font-family: "Nunito", sans-serif;
}
button:hover {
background: #00684A;
}
#response {
margin-top: 20px;
font-size: 16px;
color: #28a745;
}
footer {
text-align: center;
padding: 20px;
margin-top: 20px;
font-size: 16px;
color: #666;
}

完成前面的步骤后,您就拥有了一个可用的应用程序,它使用MongoDB、Flask 和 Celery 来管理新闻简报平台。

您可以使用以下步骤来测试您的应用程序:

1

启动您的 RabbitMQ节点。有关说明,请参阅适用于您操作系统的 RabbitMQ 文档

在 MacOS 上:

brew services start rabbitmq

在 Windows 上:

rabbitmq-service start

On Linux/Unix:

sudo systemctl start rabbitmq-server
2

使用以下代码启动应用程序:

flask --app app run

在另一个终端中,启动 Celery Worker:

celery -A app.celery worker --loglevel=info
3

在浏览器中导航到 localhost:5000 以打开 Subscribe to our Newsletter 页面。

输入订阅者信息,然后单击 Subscribe

要确认您已创建新订阅者,请打开Atlas并导航到 newsletter数据库中的 users集合。

4

在浏览器中导航至 localhost:5000/admin 以打开 Send Newsletter 页面。输入新闻简报详细信息,然后单击 Send

您的 Celery 工作日志将显示类似于下图的 Email sent日志条目:

[2025-05-27 09:54:43,873: INFO/ForkPoolWorker-7] Task tasks.send_emails[7d7f9616-7b9b-4508-a889-95c35f54fe43] succeeded in 3.93334774998948s: {'result': 'All emails sent'}
[2025-05-27 10:04:52,043: INFO/MainProcess] Task tasks.send_emails[ac2ec70f-2d3e-444a-95bb-185ac659f460] received
[2025-05-27 10:04:52,046: WARNING/ForkPoolWorker-7] Sending email to <subscriber_email>
[2025-05-27 10:04:53,474: WARNING/ForkPoolWorker-7] Email sent

您还可以导航到 newsletter数据库中的 deliveries集合,确认您已发送电子邮件。

此应用程序演示了如何将 Flask应用程序与 Celery任务队列集成,以管理订阅者数据和发送批处理电子邮件。您可以在此应用程序的基础上构建,以尝试使用 Flask 或 Celery。一些可能的改进包括以下更改:

有关本教程中使用的组件的更多信息,请参阅以下资源:

要寻求支持或为MongoDB Community做出贡献,请参阅MongoDB开发者社区页面。

后退

第三方集成

在此页面上