Overview
在本教程中,您可以学习;了解如何使用MongoDB、Flask 和 Celery构建新闻简报平台。该应用程序允许用户订阅新闻通讯,并允许管理员异步管理和批处理发送电子邮件。
Flask
Flask 是一个轻量级 Web应用程序框架,具有内置配置和约定默认值,可为跨项目的开发人员提供一致性。有关更多信息,请参阅 Flask 网页。
Celery
Celery 是一个开源分布式任务队列,可高效处理大量消息。它支持异步处理和任务调度。有关更多信息,请参阅Celery 网页。
Tutorial
本教程使用JavaScript、Flask 和MongoDB示例项目 Github存储库在 Newsletter Platform 中重新创建了示例应用程序。
先决条件
在开始本教程之前,请确保已安装并设立以下组件:
设置
创建项目目录和结构
项目目录的名称是 newsletter。创建目录并通过在终端中运行以下命令导航到该目录:
mkdir newsletter cd newsletter
以下文件将保存应用程序的代码:
app.py:Flask应用程序的主入口点config.py:应用程序的配置设置,包括MongoDB连接 URI、邮件服务器配置、Celery 代理连接和任何其他特定于环境的变量tasks.py:定义背景任务以异步发送电子邮件routes.py:定义应用程序响应的路由 (URL)
我们建议在构建应用程序时划分关注点,这样可以使应用程序模块化且更易于维护。
在项目目录中,创建以下结构:
newsletter/ ├── app.py ├── config.py ├── routes.py ├── tasks.py ├── templates/ │ ├── admin.html │ └── subscribe.html └── static/ └── styles.css
安装所需的Python包
您的应用程序使用以下库:
Flask-Mail,用于从应用程序发送电子邮件
用于管理任务的 Celery,例如批处理发送电子邮件
在终端中运行以下 pip 命令以安装依赖项:
pip install flask-pymongo Flask-Mail celery
配置应用程序
config.py文件包含用于执行以下操作的设置和凭证:
将 Celery 连接到 RabbitMQ 作为其消息代理
配置 Flask-Mail 以使用 Gmail 作为其 SMTP服务器
将应用程序连接到MongoDB 部署
通过将以下代码添加到 config.py文件来定义必要的配置:
import os class Config: MAIL_USERNAME = '<username>' # Your email address without '@gmail.com' MAIL_PASSWORD = '<app password>' MAIL_DEFAULT_SENDER = '<email address>' MONGO_URI = '<mongodb connection string>' DATABASE_NAME = "newsletter" ALLOWED_IPS = ['127.0.0.1'] MAIL_SERVER = 'smtp.gmail.com' MAIL_PORT = 587 MAIL_USE_TLS = True CELERY_BROKER_URL = 'amqp://guest:guest@localhost//' RESULT_BACKEND = MONGO_URI + '/celery_results'
您必须提供 Gmail凭证和电子邮件(MAIL_USERNAME、MAIL_PASSWORD 和 MAIL_DEFAULT_SENDER),应用程序启用发送电子邮件。出于安全考虑,我们建议您生成要使用的应用密码,而不是使用主节点 (primary node in the replica set)密码。有关更多信息,请参阅 Google 帐户中的应用程序密码设置。
您还必须提供一个连接字符串,将其设立为 MONGO_URI 环境变量。 有关更多信息,请参阅本指南的“创建连接字符串”部分。
提供的 Celery 代理URL (CELERY_BROKER_URL) 将 RabbitMQ 指定为其代理,但您可以自定义此URL以支持其他实施。有关更多信息,请参阅 Celery 文档的代理设置部分。
ALLOWED_IPS 列表用于控制对 Send
Newsletter 页面的访问权限。其余变量配置 Flask 和 Celery 组件。
初始化 Flask、 MongoDB和 Celery
app.py文件初始化并配置应用程序的核心组件。它执行以下任务:
创建 Flask应用程序并加载配置常量
使用应用程序的邮件服务器设置初始化 Flask-Mail实例
使用PyMongo驾驶员连接到
newsletterMongoDB 数据库创建一个配置有 Flask应用和所选代理的 Celery实例
通过将以下代码添加到 app.py文件来初始化 Flask、 MongoDB和 Celery:
from flask import Flask from flask_mail import Mail from flask_pymongo import PyMongo from celery import Celery # Create a Flask application app = Flask(__name__) app.config.from_object('config.Config') # Create a Flask-Mail instance mail = Mail(app) # Connect to MongoDB client = PyMongo(app).cx db = client[app.config['DATABASE_NAME']] # Create a Celery instance celery = Celery(app.name, broker=app.config['CELERY_BROKER_URL']) celery.conf.update(app.config) from routes import * from tasks import * if __name__ == '__main__': app.run(debug=True)
创建 Celery 任务
Celery任务使用 app.py文件中实例化的组件向订阅者发送新闻简报电子邮件。
@celery.task() 装饰器将该函数注册为 Celery任务。设置 bind=True 意味着该函数接收任务实例作为 self 参数,这允许它访问权限Celery任务方法和元数据。有关任务的更多信息,请参阅 celery。应用。任务API文档。
由于此任务在 Flask 的HTTP请求周期之外运行,因此您必须通过将电子邮件逻辑包装在 with
app.app_context()区块中来手动提供应用程序程序上下文。这使 Flask访问权限其他组件,例如 Flask-Mail mail实例以及PyMongo与 newsletter MongoDB 数据库的连接。
此函数循环遍历 subscribers 列表,使用 Flask-Mail Message 类创建电子邮件,然后使用 mail对象将其发送给每个用户。每封电子邮件发送后,它都会在MongoDB deliveries集合中插入一个文档,以记录邮件的发送情况,从而记录发送情况。每个电子邮件操作都包装在 try区块中,以确保在出现错误时记录故障,并且不会使用错误的传递记录更新数据库。
通过将以下代码添加到 tasks.py文件来定义 send_emails() 函数:
from flask_mail import Message from app import app, mail, db, celery from datetime import datetime def send_emails(self, subscribers, title, body): with app.app_context(): for subscriber in subscribers: try: print(f"Sending email to {subscriber['email']}") msg = Message(title, recipients=[subscriber['email']]) msg.body = body mail.send(msg) db.deliveries.insert_one({ 'email': subscriber['email'], 'title': title, 'body': body, 'delivered_at': datetime.utcnow() }) print("Email sent") except Exception as e: print(f"Failed to send email to {subscriber['email']}: {str(e)}") return {'result': 'All emails sent'}
定义路由
在 Flask 中,@app.route() 装饰器为特定函数分配URL路径。在以下代码中,它用于定义根 (/)、/admin、/subscribe 和 /send-newsletters 路由。在某些情况下,可选的 methods 参数用于定义允许的HTTP方法列表。
@app.before_request() 装饰器会设置一个在每个请求之前运行的函数。在本例中,该函数将对 admin 页面的访问权限限制为 config.py文件中定义的 ALLOWED_IPS 参数所列出的IP地址,从而提供了一些基本的安全性。具体来说,只允许 localhost访问权限。
根和 /admin 使用 render_template() 方法路由呈现页面。/subscribe 和 /send-newsletters 在 request.form[] 中路由访问权限请求参数以执行命令,然后返回HTTP响应。
通过将以下代码添加到 routes.py文件来定义路由:
from flask import render_template, request, abort, jsonify from app import app, db from tasks import send_emails def limit_remote_addr(): if 'X-Forwarded-For' in request.headers: remote_addr = request.headers['X-Forwarded-For'].split(',')[0] else: remote_addr = request.remote_addr if request.endpoint == 'admin' and remote_addr not in app.config['ALLOWED_IPS']: abort(403) def home(): return render_template('subscribe.html') def admin(): return render_template('admin.html') def subscribe(): first_name = request.form['firstname'] last_name = request.form['lastname'] email = request.form['email'] if db.users.find_one({'email': email}): return """ <div class="response error"> <span class="icon">✖</span> This email is already subscribed! </div> """, 409 db.users.insert_one({'firstname': first_name, 'lastname': last_name, 'email': email, 'subscribed': True}) return """ <div class="response success"> <span class="icon">✔</span> Subscribed successfully! </div> """, 200 def send_newsletters(): title = request.form['title'] body = request.form['body'] subscribers = list(db.users.find({'subscribed': True})) for subscriber in subscribers: subscriber['_id'] = str(subscriber['_id']) send_emails.apply_async(args=[subscribers, title, body]) return jsonify({'message': 'Emails are being sent!'}), 202
您可以在此文件中为应用程序添加更多安全保护或自定义面向用户的警报。
创建页面模板
templates目录中的 HTML 文件定义用户界面,并使用标准 HTML 编写。由于此应用程序使用异步HTTP请求,因此这些文件中的脚本使用 Fetch API调用。这些脚本还处理超时和错误。
订阅页面
将以下代码复制到 subscribe.html文件以创建 Subscribe to Newsletter 页面。
<!DOCTYPE html> <html lang="en"> <head> <meta charset="UTF-8"> <meta name="viewport" content="width=device-width, initial-scale=1.0"> <title>Subscribe to Newsletter</title> <link rel="stylesheet" href="{{ url_for('static', filename='styles.css') }}"> </head> <body> <h1>Subscribe to our Newsletter</h1> <form id="subscribe-form"> <label for="firstname">First Name:</label> <input type="text" id="firstname" name="firstname" required> <br> <label for="lastname">Last Name:</label> <input type="text" id="lastname" name="lastname" required> <br> <label for="email">Email:</label> <input type="email" id="email" name="email" required> <br> <button type="submit">Subscribe</button> </form> <div id="response"></div> <script> document.getElementById('subscribe-form').addEventListener('submit', function(event) { event.preventDefault(); var formData = new FormData(event.target); fetch('/subscribe', { method: 'POST', body: formData }).then(response => { if (!response.ok) { throw response; } return response.text(); }).then(data => { document.getElementById('response').innerHTML = data; document.getElementById('subscribe-form').reset(); setTimeout(() => { document.getElementById('response').innerHTML = ''; }, 3000); }).catch(error => { error.text().then(errorMessage => { document.getElementById('response').innerHTML = errorMessage; setTimeout(() => { document.getElementById('response').innerHTML = ''; }, 3000); }); }); }); </script> </body> </html>
管理页面
管理页面脚本向用户显示警报,指示 send_newsletter 调用成功。
将以下代码复制到 admin.html文件以创建 Send Newsletter 页面:
<!DOCTYPE html> <html lang="en"> <head> <meta charset="UTF-8"> <meta name="viewport" content="width=device-width, initial-scale=1.0"> <title>Admin - Send Newsletter</title> <link rel="stylesheet" href="{{ url_for('static', filename='styles.css') }}"> </head> <body> <h1>Send Newsletter</h1> <form id="admin-form"> <label for="title">Title:</label> <input type="text" id="title" name="title" required> <br> <label for="body">Body:</label> <textarea id="body" name="body" required></textarea> <br> <button type="submit">Send</button> </form> <div id="response"></div> <script> document.getElementById('admin-form').addEventListener('submit', function(event) { event.preventDefault(); var formData = new FormData(event.target); fetch('/send-newsletters', { method: 'POST', body: formData }) .then(response => response.json()) .then(() => { document.getElementById('response').innerText = 'Emails are being sent!'; setTimeout(() => { document.getElementById('response').innerText = ''; }, 3000); document.getElementById('admin-form').reset(); }) .catch(error => { document.getElementById('response').innerText = 'Error sending emails.'; setTimeout(() => { document.getElementById('response').innerText = ''; }, 3000); console.error('Error:', error); }); }); </script> </body> </html>
设置页面格式
您可以通过将以下代码添加到 styles.css文件来将样式表应用模板:
body { font-family: system-ui; font-optical-sizing: auto; font-weight: 300; font-style: normal; margin: 0; padding: 0; display: flex; flex-direction: column; align-items: center; justify-content: center; height: 100vh; background-color: #040100; } h1 { color: white; } form { background: #023430; padding: 30px 40px; border-radius: 8px; box-shadow: 0 0 10px rgba(0, 0, 0, 0.1); width: 100%; max-width: 400px; margin: 20px 0; } label { display: block; margin-bottom: 8px; font-weight: bold; color: white; } input[type="text"], input[type="email"], textarea { width: 100%; padding: 10px; margin-bottom: 10px; border: 1px solid #ccc; border-radius: 4px; font-size: 16px; } button { background: #00ED64; color: white; padding: 10px 20px; border: none; border-radius: 4px; cursor: pointer; font-size: 16px; font-family: "Nunito", sans-serif; } button:hover { background: #00684A; } #response { margin-top: 20px; font-size: 16px; color: #28a745; } footer { text-align: center; padding: 20px; margin-top: 20px; font-size: 16px; color: #666; }
测试应用程序
完成前面的步骤后,您就拥有了一个可用的应用程序,它使用MongoDB、Flask 和 Celery 来管理新闻简报平台。
您可以使用以下步骤来测试您的应用程序:
启动背景服务
启动您的 RabbitMQ节点。有关说明,请参阅适用于您操作系统的 RabbitMQ 文档。
在 MacOS 上:
brew services start rabbitmq
在 Windows 上:
rabbitmq-service start
On Linux/Unix:
sudo systemctl start rabbitmq-server
创建订阅者
在浏览器中导航到 localhost:5000 以打开 Subscribe to our Newsletter 页面。
输入订阅者信息,然后单击 Subscribe。
要确认您已创建新订阅者,请打开Atlas并导航到 newsletter数据库中的 users集合。
发送新闻简报
在浏览器中导航至 localhost:5000/admin 以打开 Send Newsletter 页面。输入新闻简报详细信息,然后单击 Send。
您的 Celery 工作日志将显示类似于下图的 Email sent日志条目:
[2025-05-27 09:54:43,873: INFO/ForkPoolWorker-7] Task tasks.send_emails[7d7f9616-7b9b-4508-a889-95c35f54fe43] succeeded in 3.93334774998948s: {'result': 'All emails sent'} [2025-05-27 10:04:52,043: INFO/MainProcess] Task tasks.send_emails[ac2ec70f-2d3e-444a-95bb-185ac659f460] received [2025-05-27 10:04:52,046: WARNING/ForkPoolWorker-7] Sending email to <subscriber_email> [2025-05-27 10:04:53,474: WARNING/ForkPoolWorker-7] Email sent
您还可以导航到 newsletter数据库中的 deliveries集合,确认您已发送电子邮件。
后续步骤
此应用程序演示了如何将 Flask应用程序与 Celery任务队列集成,以管理订阅者数据和发送批处理电子邮件。您可以在此应用程序的基础上构建,以尝试使用 Flask 或 Celery。一些可能的改进包括以下更改:
更多资源
有关本教程中使用的组件的更多信息,请参阅以下资源:
要寻求支持或为MongoDB Community做出贡献,请参阅MongoDB开发者社区页面。