Docs 菜单
Docs 主页
/ /

Atlas延迟指导

以下各节列出了在 Atlas 部署中可以为减少延迟而做出的配置选择。

物理距离是延迟的主要原因。用户与您的应用程序之间、应用程序与数据之间以及集群节点之间的距离都会影响系统延迟和应用程序性能。

为了减少写入操作的延迟,必须将应用程序和数据放置在地理位置上距离用户更近的地方。承载数据的Atlas节点是Atlas 集群中的服务器节点,用于存储数据库数据并处理写入操作。为了支持应用程序用户更快地数据访问,请将数据Atlas节点部署到地理位置靠近大多数应用程序用户的云提供商区域。

如果您的应用程序用户分布在多个地理区域,例如美国和欧洲,我们建议您在每个地理区域部署一个或多个区域,以减少每个位置的用户延迟。要了解更多关于多区域部署的信息,请参阅 多区域部署范例。

如果您的数据按地理位置划分,以便每个地理位置的用户访问权限不同的数据集,则您还可以按地区或地理位置对数据分片,以便为每个地理位置的用户优化写入性能。这种方法允许您处理大型数据集和高吞吐量,同时确保数据局部性。

复制是将数据从主节点复制到从节点。可以调整以下复制配置选项,以最大限度地减少副本集中读写操作的延迟:

  • 写关注(write concern)级别: 配置写关注(write concern)时,需要在写入延迟和写入持久性之间进行权衡。MongoDB 的默认写关注(write concern)设立为 majority,这要求在 Atlas 向客户端确认操作已完成并成功之前,每个写入操作都必须复制到副本集的大多数投票、数据承载节点。设置较高的写关注(write concern)会增加写入延迟,但也会提高写入持久性并防止副本集故障转移期间发生回滚

  • 读关注(read concern)和读取偏好(read preference):配置读关注(read concern)读取偏好(read preference)时,需要在查询延迟、数据可用性和查询响应的一致性之间进行权衡。MongoDB默认的全局读关注(read concern)为 local,它要求读取操作仅从本地副本集的一个节点读取,而无需等待确认数据已复制到其他节点。该节点是主节点 (primary node in the replica set)还是从节点(secondary node from replica set)由您的读取偏好(read preference)决定,Atlas默认将其设置为 primary。这种默认的读关注(read concern)和偏好组合可优化来自副本集最新节点的最低延迟读取,但它也存在以下风险:主节点 (primary node in the replica set)节点的数据可能不持久性,并且可能在故障转移,如果没有可用的主节点 (primary node in the replica set)节点,用户将无法查询数据。

    您可以将读取偏好更改为primaryPreferred ,允许读取操作在没有可用的主节点或存在地理位置较近的从节点时读取从节点,但如果从节点不是最新的,则此运行存在返回陈旧数据的风险。此风险可以通过增加您的写关注来缓解,确保更多从节点保持最新状态,但这会增加您的写入延迟。

    重要

    请记住,由于复制延迟,可能会有从节点返回过时的数据。

  • 查询超时限制:您可以在部署中设立全局和操作级别的查询超时限制,以减少应用程序在超时之前等待响应的时间。这可以防止持续查询长时间对部署性能产生负面影响。

  • 节点选举优先级:为增加在副本集选举中主数据中心的成员先于备用数据中心的成员当选主节点的可能性,您可以将备用数据中心成员的members[n].priority优先级设置得低于主数据中心成员的优先级。例如,如果您将集群部署在AWS 区域us-east-1(北弗吉尼亚)和us-west-1(北加州),且大多数用户都在加州,您可以优先设置 AWS us-west-1(北加州)区域的节点,确保主节点地理上始终靠近大多数用户,并且能以最小延迟响应读写操作。

  • 镜像读:镜像读通过在从节点上预热缓存,减轻服务中断后主节点重新选举的影响。有关详细信息,请参阅镜像读。

有关根据您的需求实现最佳复制配置的更多指导,请联系 MongoDB 的Professional Services

您可以通过以下网络连接选项来提高安全性并进一步降低延迟:

  • 私有端点: 私有端点 在应用程序的虚拟网络和Atlas 集群之间建立直接、安全的连接,从而可能减少网络跃点并改善延迟。

  • VPC 对等互连:在副本集中配置 VPC 对等互连,以允许应用程序连接到对等区域。在发生故障转移的情况下,VPC 对等互连为应用程序服务器提供了一种检测新主节点并相应地路由流量的方式。

应用程序访问数据的速度会影响延迟。良好的数据建模查询优化可以提高数据访问速度。例如,您可以:

  • 减小文档大小:考虑缩短字段名称和值长度,以减少通过网络传输的数据量。

  • 优化查询模式:有效使用索引,最大限度地减少需要跨区域读取的数据量。

Atlas提供实时性能面板(实时性能面板),用于观察不同区域的延迟指标。您还可以实现应用程序级监控,追踪应用程序的往返延迟。在最终生产部署之前,我们建议在各种多区域场景下进行性能测试,以识别和解决延迟瓶颈。

要了解有关监控部署的更多信息,请参阅Atlas 监控和警报指导

我们建议您尽可能使用基于应用程序编程语言的最新驱动程序版本的连接方法。虽然 Atlas 提供的默认连接字符串是一个不错的起点,但您可以根据特定应用程序和部署架构,向连接字符串添加连接字符串选项以提高性能。

对于企业级应用程序部署,调整连接池设置以满足用户需求,同时最大限度地减少操作延迟尤其重要。示例,您可以使用 minPoolSizemaxPoolSize 选项来调整大多数数据库客户端连接的打开方式和时间,从而防止或针对相关网络开销带来的延迟峰值进行规划。

这些设置的配置范围取决于您的部署架构。示例,如果您的应用程序部署利用单线程资源(如 AWS Lambda),您的应用程序将只能打开和使用一个客户端连接。要详细了解如何以及在何处创建和使用连接池,以及在何处指定连接池设置,请参阅连接池概述。

以下示例应用程序汇集了本页面上旨在减少数据操作延迟的关键建议:

  • 使用 Atlas 提供的连接字符串,具有可重试写入、多数写关注和默认读关注。

  • 使用maxTimeMS方法指定 optime 限制。 有关如何设置maxTimeMS的说明,请参阅特定的驱动程序文档。

  • 处理重复键和超时的错误。

该应用程序是一个HTTP API,允许客户端创建或列出用户记录。它公开一个接受 GET 和 帖子 请求的终结点 http://localhost:3000:

方法
端点
说明

GET

/users

users 集合中获取用户名列表。

POST

/users

要求在请求正文中加入 name。将新用户添加到 users 集合。

注意

以下服务器应用程序使用了 NanoHTTPD 和JSON ,您需要将其作为依赖项添加到项目中,然后才能运行。

1// File: App.java
2
3import java.util.Map;
4import java.util.logging.Logger;
5
6import org.bson.Document;
7import org.json.JSONArray;
8
9import com.mongodb.MongoException;
10import com.mongodb.client.MongoClient;
11import com.mongodb.client.MongoClients;
12import com.mongodb.client.MongoCollection;
13import com.mongodb.client.MongoDatabase;
14
15import fi.iki.elonen.NanoHTTPD;
16
17public class App extends NanoHTTPD {
18 private static final Logger LOGGER = Logger.getLogger(App.class.getName());
19
20 static int port = 3000;
21 static MongoClient client = null;
22
23 public App() throws Exception {
24 super(port);
25
26 // Replace the uri string with your MongoDB deployment's connection string
27 String uri = "<atlas-connection-string>";
28 client = MongoClients.create(uri);
29
30 start(NanoHTTPD.SOCKET_READ_TIMEOUT, false);
31 LOGGER.info("\nStarted the server: http://localhost:" + port + "/ \n");
32 }
33
34 public static void main(String[] args) {
35 try {
36 new App();
37 } catch (Exception e) {
38 LOGGER.severe("Couldn't start server:\n" + e);
39 }
40 }
41
42 @Override
43 public Response serve(IHTTPSession session) {
44 StringBuilder msg = new StringBuilder();
45 Map<String, String> params = session.getParms();
46
47 Method reqMethod = session.getMethod();
48 String uri = session.getUri();
49
50 if (Method.GET == reqMethod) {
51 if (uri.equals("/")) {
52 msg.append("Welcome to my API!");
53 } else if (uri.equals("/users")) {
54 msg.append(listUsers(client));
55 } else {
56 msg.append("Unrecognized URI: ").append(uri);
57 }
58 } else if (Method.POST == reqMethod) {
59 try {
60 String name = params.get("name");
61 if (name == null) {
62 throw new Exception("Unable to process POST request: 'name' parameter required");
63 } else {
64 insertUser(client, name);
65 msg.append("User successfully added!");
66 }
67 } catch (Exception e) {
68 msg.append(e);
69 }
70 }
71
72 return newFixedLengthResponse(msg.toString());
73 }
74
75 static String listUsers(MongoClient client) {
76 MongoDatabase database = client.getDatabase("test");
77 MongoCollection<Document> collection = database.getCollection("users");
78
79 final JSONArray jsonResults = new JSONArray();
80 collection.find().forEach((result) -> jsonResults.put(result.toJson()));
81
82 return jsonResults.toString();
83 }
84
85 static String insertUser(MongoClient client, String name) throws MongoException {
86 MongoDatabase database = client.getDatabase("test");
87 MongoCollection<Document> collection = database.getCollection("users");
88
89 collection.insertOne(new Document().append("name", name));
90 return "Successfully inserted user: " + name;
91 }
92}

注意

以下服务器应用程序使用 Express,您需要先将其作为依赖项添加到项目中,然后才能运行它。

1const express = require('express');
2const bodyParser = require('body-parser');
3
4// Use the latest drivers by installing & importing them
5const MongoClient = require('mongodb').MongoClient;
6
7const app = express();
8app.use(bodyParser.json());
9app.use(bodyParser.urlencoded({ extended: true }));
10
11const uri = "mongodb+srv://<db_username>:<db_password>@cluster0-111xx.mongodb.net/test?retryWrites=true&w=majority";
12
13const client = new MongoClient(uri, {
14 useNewUrlParser: true,
15 useUnifiedTopology: true
16});
17
18// ----- API routes ----- //
19app.get('/', (req, res) => res.send('Welcome to my API!'));
20
21app.get('/users', (req, res) => {
22 const collection = client.db("test").collection("users");
23
24 collection
25 .find({})
26 .maxTimeMS(5000)
27 .toArray((err, data) => {
28 if (err) {
29 res.send("The request has timed out. Please check your connection and try again.");
30 }
31 return res.json(data);
32 });
33});
34
35app.post('/users', (req, res) => {
36 const collection = client.db("test").collection("users");
37 collection.insertOne({ name: req.body.name })
38 .then(result => {
39 res.send("User successfully added!");
40 }, err => {
41 res.send("An application error has occurred. Please try again.");
42 })
43});
44// ----- End of API routes ----- //
45
46app.listen(3000, () => {
47 console.log(`Listening on port 3000.`);
48 client.connect(err => {
49 if (err) {
50 console.log("Not connected: ", err);
51 process.exit(0);
52 }
53 console.log('Connected.');
54 });
55});

注意

以下 Web应用程序使用 FastAPI。要创建新的应用程序,请使用FastAPI示例文件结构。

1# File: main.py
2
3from fastapi import FastAPI, Body, Request, Response, HTTPException, status
4from fastapi.encoders import jsonable_encoder
5
6from typing import List
7from models import User
8
9import pymongo
10from pymongo import MongoClient
11from pymongo import errors
12
13# Replace the uri string with your |service| connection string
14uri = "<atlas-connection-string>"
15db = "test"
16
17app = FastAPI()
18
19@app.on_event("startup")
20def startup_db_client():
21 app.mongodb_client = MongoClient(uri)
22 app.database = app.mongodb_client[db]
23
24@app.on_event("shutdown")
25def shutdown_db_client():
26 app.mongodb_client.close()
27
28##### API ROUTES #####
29@app.get("/users", response_description="List all users", response_model=List[User])
30def list_users(request: Request):
31 try:
32 users = list(request.app.database["users"].find().max_time_ms(5000))
33 return users
34 except pymongo.errors.ExecutionTimeout:
35 raise HTTPException(status_code=status.HTTP_503_SERVICE_UNAVAILABLE, detail="The request has timed out. Please check your connection and try again.")
36
37@app.post("/users", response_description="Create a new user", status_code=status.HTTP_201_CREATED)
38def new_user(request: Request, user: User = Body(...)):
39 user = jsonable_encoder(user)
40 try:
41 new_user = request.app.database["users"].insert_one(user)
42 return {"message":"User successfully added!"}
43 except pymongo.errors.DuplicateKeyError:
44 raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="Could not create user due to existing '_id' value in the collection. Try again with a different '_id' value.")

后退

可扩展性

在此页面上