Docs Menu
Docs Home
/ / /
Pymongo ドライバー
/

フラグメントと セルクエリの統合

このチュートリアルでは、 MongoDB、Flask、および Celery を使用してニュースレタープラットフォームを構築する方法を学びます。このアプリケーションを使用すると、ユーザーはメールでニュースレターをサブスクライブし、管理者はバッチするを管理して非同期に送信できます。

フラグは、プロジェクト間で開発者に一貫性を提供する組み込みの構成と規則のデフォルトを持つ軽量ウェブアプリケーションフレームワークです。詳しくは、Fluk のウェブページをご覧ください。

Celery は、大量のメッセージを効率的に処理するオープンソースの分散タスクキューです。非同期処理とタスクスケジューリングをサポートします。詳しくは、Celery のウェブページ を参照してください。

このチュートリアルでは、JavaScript、Flask、 MongoDBサンプルプロジェクト Githubリポジトリを使用して、Newsletter Platform のサンプルアプリケーションを再作成します。

このチュートリアルを開始する前に、次のコンポーネントがインストールされ、設定されていることを確認してください。

  • MongoDBクラスター。Atlas の使用をお勧めします。Atlas クラスターの作成方法については、Atlas ドキュメントの「 Atlas を使い始める 」ページを参照してください。

  • クラスター内の newsletter という名前のデータベース。詳細については、Atlasガイドの 「データベースの作成」 ページを参照してください。

  • Perl のメッセージ プロバイダーとして使用する RapidMQ を使用します。

  • SMTPサーバーとして使用する Gmail 。SMTP サーバーの詳細については、Wikipedia の 簡易メール転送プロトコル のページを参照してください。

  • Python 3.9 以降

1

プロジェクトディレクトリの名前は newsletter です。ディレクトリを作成し、ターミナルで次のコマンドを実行中してそのディレクトリに移動します。

mkdir newsletter
cd newsletter

次のファイルには、アプリケーションのコードが保持されます。

  • app.py: Flukアプリケーションのメインエントリ点

  • config.py: MongoDB接続 URI、メールサーバー構成、Celery ノードでの接続、その他の環境固有の変数など、アプリケーションの構成設定

  • tasks.py: メールを非同期に送信するためのバックグラウンド タスクを定義します

  • routes.py:アプリケーションが応答するルート(URL)を定義します

アプリケーションを個別の考慮事項で構成することをお勧めします。これにより、アプリケーションはモジュール型であり、より保守可能になります。

プロジェクトディレクトリに、次の構造を作成します。

newsletter/
├── app.py
├── config.py
├── routes.py
├── tasks.py
├── templates/
│ ├── admin.html
│ └── subscribe.html
└── static/
└── styles.css
2

アプリケーションは次のライブラリを使用します。

  • ウェブサーバーとルーティング を処理するためのフラグメント

  • アプリケーションからメールを送信するための Fluk-Mail

  • PyMongo

  • バッチするメールの送信などのタスクを管理するための照合

Tip

仮想環境の使用

Python仮想環境を使用すると、さまざまなプロジェクトに対してさまざまなバージョンのライブラリをインストールできます。pip コマンドを実行中前に、virtualenv がアクティブであることを確認してください。

依存関係をインストールするには、ターミナルで次の pip コマンドを実行します。

pip install flask-pymongo Flask-Mail celery

config.pyファイルには、次のアクションを実行するための設定と認証情報が含まれています。

  • メッセージ プロバイダーとして行います

  • SMTPサーバーとして Gmail を使用するようにFlask-Mail を構成する

  • アプリケーションをMongoDBデプロイに接続する

次のコードを config.pyファイルに追加して、必要な構成を定義します。

import os
class Config:
MAIL_USERNAME = '<username>' # Your email address without '@gmail.com'
MAIL_PASSWORD = '<app password>'
MAIL_DEFAULT_SENDER = '<email address>'
MONGO_URI = '<mongodb connection string>'
DATABASE_NAME = "newsletter"
ALLOWED_IPS = ['127.0.0.1']
MAIL_SERVER = 'smtp.gmail.com'
MAIL_PORT = 587
MAIL_USE_TLS = True
CELERY_BROKER_URL = 'amqp://guest:guest@localhost//'
RESULT_BACKEND = MONGO_URI + '/celery_results'

アプリケーションでメールの送信を有効にするには、Gmail の認証情報とメール(MAIL_USERNAMEMAIL_PASSWORDMAIL_DEFAULT_SENDER)を提供する必要があります。セキュリティ上の理由から、プライマリ パスワードを使用するのではなく、使用するアプリパスワードを生成することをお勧めします。詳しくは、Google アカウントの「App Password」設定を参照してください。

また、MONGO_URI 環境変数として設定する接続文字列も指定する必要があります。 詳細については、このガイドの「接続文字列の作成」セクションを参照してください。

提供された Celer ブロッキングURL (CELERY_BROKER_URL)は、RapidMQ をプロバイダーとして指定しますが、このURL をカスタマイズして、他の実装をサポートすることができます。詳細については、 Celer ドキュメントのブロック設定セクションを参照してください。

ALLOWED_IPS リストは Send Newsletter ページへのアクセスを制御するために使用されます。変数の残りの部分は、Flask と Celery コンポーネントを構成します。

app.pyファイルは、アプリケーションの主要なコンポーネントを初期化し、構成します。次のタスクを実行します。

  • Fluskアプリケーションを作成し、構成定数をロードします

  • アプリのメールサーバー設定でFlask-Mailインスタンスを初期化

  • PyMongoドライバー を使用して newsletter MongoDBデータベースに接続します

  • Fluskアプリと選択したブロッキングサーバーで構成された Celeryインスタンスを作成します

次のコードを app.pyファイルに追加して、Flask、 MongoDB 、Celery を初期化します。

from flask import Flask
from flask_mail import Mail
from flask_pymongo import PyMongo
from celery import Celery
# Create a Flask application
app = Flask(__name__)
app.config.from_object('config.Config')
# Create a Flask-Mail instance
mail = Mail(app)
# Connect to MongoDB
client = PyMongo(app).cx
db = client[app.config['DATABASE_NAME']]
# Create a Celery instance
celery = Celery(app.name, broker=app.config['CELERY_BROKER_URL'])
celery.conf.update(app.config)
from routes import *
from tasks import *
if __name__ == '__main__':
app.run(debug=True)

セカンダリタスクは、app.pyファイルでインスタンス化されたコンポーネントを使用して、サブスクライブ者にニュースレターメールを送信します。

@celery.task() 修飾子は、関数を Celerタスクとして登録します。bind=True を設定すると、関数はタスクインスタンスを self 引数として受け取ります。これにより、 Celeryタスクメソッドとメタデータにアクセスできるようになります。タスクの詳細については、 セカンダリ を参照してください。アプリ。タスクAPIドキュメントを参照してください。

このタスクはFlusk のHTTPリクエストサイクルの外部で実行されるため、メールロジックを with app.app_context() ブロックでラップしてアプリケーションコンテキストを手動で提供する必要があります。これにより、Flask は Flusk-Mail mailインスタンスや newsletter MongoDBデータベースへのPyMongo接続などの他のコンポーネントにアクセスできます。

この関数は subscribers のリストをループ処理し、 Flusk-Mail Messageクラスを使用してメールを作成し、 mailオブジェクトを使用して各ユーザーに送信します。各メールが送信された後に、 MongoDB deliveriesコレクションにドキュメントを挿入してメッセージが送信されたことをレコードことで配信をログに記録します。各メール操作はtry ブロックにラップされ、エラーが発生した場合に失敗がログに記録され、データベースが誤った配信レコードで更新されないようにしています。

次のコードを tasks.pyファイルに追加して、send_emails() 関数を定義します。

from flask_mail import Message
from app import app, mail, db, celery
from datetime import datetime
@celery.task(bind=True)
def send_emails(self, subscribers, title, body):
with app.app_context():
for subscriber in subscribers:
try:
print(f"Sending email to {subscriber['email']}")
msg = Message(title, recipients=[subscriber['email']])
msg.body = body
mail.send(msg)
db.deliveries.insert_one({
'email': subscriber['email'],
'title': title,
'body': body,
'delivered_at': datetime.utcnow()
})
print("Email sent")
except Exception as e:
print(f"Failed to send email to {subscriber['email']}: {str(e)}")
return {'result': 'All emails sent'}

[Flask] では、@app.route() 修飾子は特定の関数へのURLパスを割り当てます。次のコードでは、ルート(/)、/admin/subscribe/send-newsletters ルートの定義に使用されます。任意の methods パラメータは、許可されたHTTPメソッドのリストを定義するために一部のインスタンスでは使用されます。

@app.before_request() 修飾子は、すべてのリクエストの前に関数を実行するように設定します。この場合、関数は admin ページへのアクセスを config.pyファイルに定義された ALLOWED_IPS パラメータにリストされているIPアドレスに制限することで、基本的なセキュリティを提供します。具体的には、アクセスは localhost に対してのみ許可されます。

ルートと /admin は、render_template() メソッドを使用してレンダリング ページをルーティングします。/subscribe/send-newsletters は、request.form[] のアクセスリクエストパラメータをルーティングしてコマンドを実行し、 HTTPレスポンスを返します。

次のコードを routes.pyファイルに追加してルートを定義します。

from flask import render_template, request, abort, jsonify
from app import app, db
from tasks import send_emails
@app.before_request
def limit_remote_addr():
if 'X-Forwarded-For' in request.headers:
remote_addr = request.headers['X-Forwarded-For'].split(',')[0]
else:
remote_addr = request.remote_addr
if request.endpoint == 'admin' and remote_addr not in app.config['ALLOWED_IPS']:
abort(403)
@app.route('/')
def home():
return render_template('subscribe.html')
@app.route('/admin')
def admin():
return render_template('admin.html')
@app.route('/subscribe', methods=['POST'])
def subscribe():
first_name = request.form['firstname']
last_name = request.form['lastname']
email = request.form['email']
if db.users.find_one({'email': email}):
return """
<div class="response error">
<span class="icon">&#x2716;</span> This email is already subscribed!
</div>
""", 409
db.users.insert_one({'firstname': first_name, 'lastname': last_name, 'email': email, 'subscribed': True})
return """
<div class="response success">
<span class="icon">&#x2714;</span> Subscribed successfully!
</div>
""", 200
@app.route('/send-newsletters', methods=['POST'])
def send_newsletters():
title = request.form['title']
body = request.form['body']
subscribers = list(db.users.find({'subscribed': True}))
for subscriber in subscribers:
subscriber['_id'] = str(subscriber['_id'])
send_emails.apply_async(args=[subscribers, title, body])
return jsonify({'message': 'Emails are being sent!'}), 202

このファイルでは、アプリケーションのセキュリティ保護をさらに強化したり、ユーザー向けアラートをカスタマイズしたりできます。

templatesディレクトリ内の HTML ファイルはユーザー インターフェースを定義し、標準の HTML を使用して記述されています。このアプリケーションは非同期HTTPリクエストを使用するため、これらのファイル内のスクリプトは Fetch API呼び出し を使用します。これらのスクリプトは、タイムアウトやエラーも処理します。

次のコードを subscribe.htmlファイルにコピーして、Subscribe to Newsletter ページを作成します。

<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<title>Subscribe to Newsletter</title>
<link rel="stylesheet" href="{{ url_for('static', filename='styles.css') }}">
</head>
<body>
<h1>Subscribe to our Newsletter</h1>
<form id="subscribe-form">
<label for="firstname">First Name:</label>
<input type="text" id="firstname" name="firstname" required>
<br>
<label for="lastname">Last Name:</label>
<input type="text" id="lastname" name="lastname" required>
<br>
<label for="email">Email:</label>
<input type="email" id="email" name="email" required>
<br>
<button type="submit">Subscribe</button>
</form>
<div id="response"></div>
<script>
document.getElementById('subscribe-form').addEventListener('submit', function(event) {
event.preventDefault();
var formData = new FormData(event.target);
fetch('/subscribe', {
method: 'POST',
body: formData
}).then(response => {
if (!response.ok) {
throw response;
}
return response.text();
}).then(data => {
document.getElementById('response').innerHTML = data;
document.getElementById('subscribe-form').reset();
setTimeout(() => {
document.getElementById('response').innerHTML = '';
}, 3000);
}).catch(error => {
error.text().then(errorMessage => {
document.getElementById('response').innerHTML = errorMessage;
setTimeout(() => {
document.getElementById('response').innerHTML = '';
}, 3000);
});
});
});
</script>
</body>
</html>

管理ページスクリプトには、send_newsletter 呼び出しの成功を示すユーザーへのアラートが表示されます。

次のコードを admin.htmlファイルにコピーして、Send Newsletter ページを作成します。

<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<title>Admin - Send Newsletter</title>
<link rel="stylesheet" href="{{ url_for('static', filename='styles.css') }}">
</head>
<body>
<h1>Send Newsletter</h1>
<form id="admin-form">
<label for="title">Title:</label>
<input type="text" id="title" name="title" required>
<br>
<label for="body">Body:</label>
<textarea id="body" name="body" required></textarea>
<br>
<button type="submit">Send</button>
</form>
<div id="response"></div>
<script>
document.getElementById('admin-form').addEventListener('submit', function(event) {
event.preventDefault();
var formData = new FormData(event.target);
fetch('/send-newsletters', {
method: 'POST',
body: formData
})
.then(response => response.json())
.then(() => {
document.getElementById('response').innerText = 'Emails are being sent!';
setTimeout(() => {
document.getElementById('response').innerText = '';
}, 3000);
document.getElementById('admin-form').reset();
})
.catch(error => {
document.getElementById('response').innerText = 'Error sending emails.';
setTimeout(() => {
document.getElementById('response').innerText = '';
}, 3000);
console.error('Error:', error);
});
});
</script>
</body>
</html>

次のコードを styles.cssファイルに追加することで、テンプレートにスタイル シートを適用できます。

body {
font-family: system-ui;
font-optical-sizing: auto;
font-weight: 300;
font-style: normal;
margin: 0;
padding: 0;
display: flex;
flex-direction: column;
align-items: center;
justify-content: center;
height: 100vh;
background-color: #040100;
}
h1 {
color: white;
}
form {
background: #023430;
padding: 30px 40px;
border-radius: 8px;
box-shadow: 0 0 10px rgba(0, 0, 0, 0.1);
width: 100%;
max-width: 400px;
margin: 20px 0;
}
label {
display: block;
margin-bottom: 8px;
font-weight: bold;
color: white;
}
input[type="text"],
input[type="email"],
textarea {
width: 100%;
padding: 10px;
margin-bottom: 10px;
border: 1px solid #ccc;
border-radius: 4px;
font-size: 16px;
}
button {
background: #00ED64;
color: white;
padding: 10px 20px;
border: none;
border-radius: 4px;
cursor: pointer;
font-size: 16px;
font-family: "Nunito", sans-serif;
}
button:hover {
background: #00684A;
}
#response {
margin-top: 20px;
font-size: 16px;
color: #28a745;
}
footer {
text-align: center;
padding: 20px;
margin-top: 20px;
font-size: 16px;
color: #666;
}

前の手順を完了すると、 MongoDB、Flask、および Celery を使用してニュースレタープラットフォームを管理する動作するアプリケーションが作成されます。

アプリケーションをテストするには、次の手順に従います。

1

RapidMQノードを起動します。手順については、ご使用のオペレーティング システムの RabbitMQ のドキュメント を参照してください。

MacOS の場合:

brew services start rabbitmq

Windows の場合:

rabbitmq-service start

On Linux/Unix:

sudo systemctl start rabbitmq-server
2

次のコードを使用して、アプリケーションを起動します。

flask --app app run

別のターミナルで、Celary ワーカーを起動します。

celery -A app.celery worker --loglevel=info
3

ブラウザで localhost:5000 に移動して Subscribe to our Newsletter ページを開きます。

サブスクライブ情報を入力し、[Subscribe] をクリックします。

新しいサブスクライブを作成したことを確認するには、Atlas を開き、newsletterデータベース内の usersコレクションに移動します。

4

ブラウザで localhost:5000/admin に移動して Send Newsletter ページを開きます。ニュースレターの詳細を入力し、Send をクリックします。

Celery ワーカーログには、次の画像のような Email sentログエントリが表示されます。

[2025-05-27 09:54:43,873: INFO/ForkPoolWorker-7] Task tasks.send_emails[7d7f9616-7b9b-4508-a889-95c35f54fe43] succeeded in 3.93334774998948s: {'result': 'All emails sent'}
[2025-05-27 10:04:52,043: INFO/MainProcess] Task tasks.send_emails[ac2ec70f-2d3e-444a-95bb-185ac659f460] received
[2025-05-27 10:04:52,046: WARNING/ForkPoolWorker-7] Sending email to <subscriber_email>
[2025-05-27 10:04:53,474: WARNING/ForkPoolWorker-7] Email sent

また、 newsletterデータベースの deliveriesコレクションに移動しても、メールを送信したことを確認できます。

このアプリケーションでは、FlaskアプリケーションをCeler のタスクキューと統合して、サブスクライブ データを管理し、バッチするメールを送信する方法を説明します。このアプリケーションに をビルドして、Flask または Celery を試すことができます。改善される可能性のあるものには、次の変更が含まれます。

このチュートリアルで使用されているコンポーネントの詳細については、次のリソースを参照してください。

MongoDB Community に貢献したりするには、MongoDB Developer Community ページを参照してください。

戻る

サードパーティ統合

項目一覧