Docs 菜单
Docs 主页
/
Cloud Manager
/

使用 MongoDB 构建弹性应用程序

要编写能够利用 MongoDB 功能并妥善处理副本集选举的应用程序代码,您应该:

  • 安装最新的驱动程序。

  • 使用指定所有主机的连接字符串。

  • 使用可重试写入和可重试读取。

  • 使用对您的应用程序有意义的 majority 写关注和读关注。

  • 处理应用程序中的错误。

首先,从 MongoDB 驱动程序安装适合您的语言的最新驱动程序。驱动程序将查询从应用程序连接并中继到数据库。使用最新的驱动程序可以使用最新的 MongoDB 功能。

然后,在应用程序中,导入依赖项:

如果使用 Maven ,将以下内容添加到pom.xml 依赖项列表中:

<dependencies>
<dependency>
<groupId>org.mongodb</groupId>
<artifactId>mongodb-driver-sync</artifactId>
<version>4.0.1</version>
</dependency>
</dependencies>

如果使用 Gradle,则将以下内容添加到 build.gradle 依赖项列表中:

dependencies {
compile 'org.mongodb:mongodb-driver-sync:4.0.1'
}
// Latest 'mongodb' version installed with npm
const MongoClient = require('mongodb').MongoClient;

使用指定部署中所有主机的连接string将应用程序连接到数据库。 如果您的部署执行副本集选举并选举了新的主节点,则指定部署中所有主机的连接string会在没有应用程序逻辑的情况下发现新的主节点。

您可以使用以下任一方法指定部署中的所有主机:

连接string还可以指定选项,特别是retryWriteswriteConcern。

提示

如需格式化连接string的帮助,请参阅 使用MongoDB驱动程序连接到部署。

使用连接字符串在应用程序中实例化 MongoDB 客户端:

// Create a variable for your connection string
String uri = "mongodb://[<username>:<password>@]hostname0<:port>[,hostname1:<port1>][,hostname2:<port2>][...][,hostnameN:<portN>]";
// Instantiate the MongoDB client with the URI
MongoClient client = MongoClients.create(uri);
// Create a variable for your connection string
const uri = "mongodb://[<username>:<password>@]hostname0<:port>[,hostname1:<port1>][,hostname2:<port2>][...][,hostnameN:<portN>]";
// Instantiate the MongoDB client with the URI
const client = new MongoClient(uri, {
useNewUrlParser: true,
useUnifiedTopology: true
});

注意

从 MongoDB 4.2版本开始以及4.2兼容驱动程序,MongoDB 默认会重试一次写入和读取操作。

如果某些写入操作失败,则使用可重试写入重试一次这些操作。

重试写入一次是处理暂时性网络错误和副本集选举(在此类错误中,应用程序暂时无法找到正常主节点)的最佳策略。 如果重试成功,则整个操作成功,并且不会返回错误。 如果操作失败,原因可能是:

  • 持续的网络错误,或

  • 无效命令。

提示

有关启用可重试写入的更多信息,请参阅启用可重试写入。

当操作失败时,应用程序需要自行处理错误

如果读取操作在 MongoDB 4.2版本中和使用4.2兼容驱动程序中启动失败,则会自动重试一次。 您无需将应用程序配置为重试读取。

可以使用写关注和读关注来调整应用程序的一致性和可用性。更严格的关注意味着数据库操作需要等待更强的数据一致性保证,而宽松的一致性要求则提供更高的可用性。

例子

如果您的应用程序处理货币余额,则一致性极为重要。 您可以使用majority写关注和读关注(read concern)来确保您永远不会读取过时的数据或可能回滚的数据。

或者,如果您的应用程序每秒记录来自数百个传感器的温度数据,您可能不会担心读取的数据是否不包括最新读数。 您可以放宽一致性要求,以更快地访问该数据。

您可以通过连接string URI 设置副本集的 写关注级别 。使用majority写关注确保您的数据成功写入数据库并持久保存。 这是推荐的默认值,对于大多数使用案例来说,这是足够的。

当您使用需要确认的写关注(例如 majority)时,您还可以指定写入达到该确认级别的最大时间限制:

  • 用于所有写入的 wtimeoutMS 连接字符串参数,或

  • 用于单次写入操作的 wtimeout 选项。

是否使用时间限制以及使用的值取决于应用程序上下文。

提示

有关设置写关注(write concern)级别的更多信息,请参阅写关注选项。

重要

如果没有指定写入时间限制,并且写关注(write concern)级别为无法实现,则写入操作将永远无法完成。

您可以通过连接string URI副本集的 读关注(read concern)级别 。理想的读关注(read concern)取决于应用程序要求,但默认对于大多数使用案例来说就足够了。 使用默认读关注不需要连接string参数。

指定读关注(read concern)可以提高对应用程序从数据库接收的数据的保证。

提示

有关设置读关注级别的更多信息,请参阅读关注选项。

注意

应用程序使用的写入关注和读关注(read concern)的特定组合会影响操作顺序ACID 一致性保证。 这称为因果一致性。 有关因果一致性ACID 一致性保证的更多信息,请参阅因果一致性和读写关注。

重试写入未处理的无效命令、网络中断和网络错误都会返回错误。 有关错误详细信息,请参阅驱动程序的 API文档。

例如,如果应用程序尝试插入具有重复_id的文档,您的驱动程序将返回错误,其中包括:

Unable to insert due to an error: com.mongodb.MongoWriteException:
E11000 duplicate key error collection: <db>.<collection> ...
{
"name": : "MongoError",
"message": "E11000 duplicate key error collection on: <db>.<collection> ... ",
...
}

如果没有正确的错误处理,错误可能会阻止应用程序处理请求,直到重新启动为止。

您的应用程序应处理错误,而不会崩溃或产生副作用。 在前面的应用程序插入重复_id的示例中,该应用程序可以按如下方式处理错误:

// Declare a logger instance from java.util.logging.Logger
private static final Logger LOGGER = ...
...
try {
InsertOneResult result = collection.insertOne(new Document()
.append("_id", 1)
.append("body", "I'm a goofball trying to insert a duplicate _id"));
// Everything is OK
LOGGER.info("Inserted document id: " + result.getInsertedId());
// Refer to the API documentation for specific exceptions to catch
} catch (MongoException me) {
// Report the error
LOGGER.severe("Failed due to an error: " + me);
}
...
collection.insertOne({
_id: 1,
body: "I'm a goofball trying to insert a duplicate _id"
})
.then(result => {
response.sendStatus(200) // send "OK" message to the client
},
err => {
response.sendStatus(400); // send "Bad Request" message to the client
});

此示例中的插入操作在第二次调用时会引发“重复键”错误,因为_id字段必须是唯一的。 应用程序捕获错误,通知客户端,然后应用继续运行。 但是,插入操作失败,您可以决定是否向用户显示消息、重试操作或执行其他操作。

您应该始终记录错误。进一步处理错误的常见策略包括:

  • 将错误返回给客户端,并显示错误消息。 当您无法解决错误并且需要通知用户操作无法完成时,这是一个很好的策略。

  • 写入备份数据库。 当您无法解决错误但又不想冒丢失请求数据的风险时,这是一个很好的策略。

  • 单次默认重试之后重试该操作。如果您可以通过编程方式解决错误原因,请重试,这是一个很好的策略。

您必须为应用程序上下文选择最佳策略。

例子

在重复键错误的示例中,您应该记录错误但不要重试操作,因为它永远不会成功。相反,您可以写入回退数据库并稍后查看该数据库的内容,以确保不会丢失任何信息。用户无需执行任何其他操作,数据就会被记录下来,因此您可以选择不向客户端发送错误消息。

当操作永远无法完成并阻止应用程序执行新操作时,返回错误可能是理想行为。 您可以使用maxTimeMS方法对单个操作设置时间限制,如果超过该时间限制,则返回错误供应用程序进行处理。

对每个操作设置的时间限制取决于该操作的上下文。

例子

如果您的应用程序读取并显示 inventory 集合中的简单产品信息,您可以确信这些读取操作只需要一点时间。查询长时间运行表明一直存在网络问题。将该操作的 maxTimeMS 设置为 5000(即 5 秒)意味着,一旦您确信存在网络问题,应用程序就会收到反馈。

以下示例应用程序汇集了构建弹性应用程序的建议。

该应用程序是一个简单的用户记录 API ,它在 http://localhost: 上公开两个端点:3000

方法
端点
说明

GET

/users

users 集合中获取用户名列表。

POST

/users

要求在请求正文中加入 name。将新用户添加到 users 集合。

注意

以下服务器应用程序使用 NanoHTTPDJSON 在运行之前,您需要将其作为依赖项添加到项目中。

1// File: App.java
2
3import java.util.Map;
4import java.util.logging.Logger;
5
6import org.bson.Document;
7import org.json.JSONArray;
8
9import com.mongodb.MongoException;
10import com.mongodb.client.MongoClient;
11import com.mongodb.client.MongoClients;
12import com.mongodb.client.MongoCollection;
13import com.mongodb.client.MongoDatabase;
14
15import fi.iki.elonen.NanoHTTPD;
16
17public class App extends NanoHTTPD {
18 private static final Logger LOGGER = Logger.getLogger(App.class.getName());
19
20 static int port = 3000;
21 static MongoClient client = null;
22
23 public App() throws Exception {
24 super(port);
25
26 // Replace the uri string with your MongoDB deployment's connection string
27 String uri = "mongodb://<username>:<password>@hostname0:27017,hostname1:27017,hostname2:27017/?retryWrites=true&w=majority";
28 client = MongoClients.create(uri);
29
30 start(NanoHTTPD.SOCKET_READ_TIMEOUT, false);
31 LOGGER.info("\nStarted the server: http://localhost:" + port + "/ \n");
32 }
33
34 public static void main(String[] args) {
35 try {
36 new App();
37 } catch (Exception e) {
38 LOGGER.severe("Couldn't start server:\n" + e);
39 }
40 }
41
42 @Override
43 public Response serve(IHTTPSession session) {
44 StringBuilder msg = new StringBuilder();
45 Map<String, String> params = session.getParms();
46
47 Method reqMethod = session.getMethod();
48 String uri = session.getUri();
49
50 if (Method.GET == reqMethod) {
51 if (uri.equals("/")) {
52 msg.append("Welcome to my API!");
53 } else if (uri.equals("/users")) {
54 msg.append(listUsers(client));
55 } else {
56 msg.append("Unrecognized URI: ").append(uri);
57 }
58 } else if (Method.POST == reqMethod) {
59 try {
60 String name = params.get("name");
61 if (name == null) {
62 throw new Exception("Unable to process POST request: 'name' parameter required");
63 } else {
64 insertUser(client, name);
65 msg.append("User successfully added!");
66 }
67 } catch (Exception e) {
68 msg.append(e);
69 }
70 }
71
72 return newFixedLengthResponse(msg.toString());
73 }
74
75 static String listUsers(MongoClient client) {
76 MongoDatabase database = client.getDatabase("test");
77 MongoCollection<Document> collection = database.getCollection("users");
78
79 final JSONArray jsonResults = new JSONArray();
80 collection.find().forEach((result) -> jsonResults.put(result.toJson()));
81
82 return jsonResults.toString();
83 }
84
85 static String insertUser(MongoClient client, String name) throws MongoException {
86 MongoDatabase database = client.getDatabase("test");
87 MongoCollection<Document> collection = database.getCollection("users");
88
89 collection.insertOne(new Document().append("name", name));
90 return "Successfully inserted user: " + name;
91 }
92}

注意

以下服务器应用程序使用 Express,您需要先将其作为依赖项添加到项目中,然后才能运行它。

1const express = require('express');
2const bodyParser = require('body-parser');
3
4// Use the latest drivers by installing & importing them
5const MongoClient = require('mongodb').MongoClient;
6
7const app = express();
8app.use(bodyParser.json());
9app.use(bodyParser.urlencoded({ extended: true }));
10
11// Use a connection string that lists all hosts
12// with retryable writes & majority write concern
13const uri = "mongodb://<username>:<password>@hostname0:27017,hostname1:27017,hostname2:27017/?retryWrites=true&w=majority";
14
15const client = new MongoClient(uri, {
16 useNewUrlParser: true,
17 useUnifiedTopology: true
18});
19
20// ----- API routes ----- //
21app.get('/', (req, res) => res.send('Welcome to my API!'));
22
23app.get('/users', (req, res) => {
24 const collection = client.db("test").collection("users");
25
26 collection
27 .find({})
28 // In this example, 'maxTimeMS' throws an error after 5 seconds,
29 // alerting the application to a lasting network outage
30 .maxTimeMS(5000)
31 .toArray((err, data) => {
32 if (err) {
33 // Handle errors in your application
34 // In this example, by sending the client a message
35 res.send("The request has timed out. Please check your connection and try again.");
36 }
37 return res.json(data);
38 });
39});
40
41app.post('/users', (req, res) => {
42 const collection = client.db("test").collection("users");
43 collection.insertOne({ name: req.body.name })
44 .then(result => {
45 res.send("User successfully added!");
46 }, err => {
47 // Handle errors in your application
48 // In this example, by sending the client a message
49 res.send("An application error has occurred. Please try again.");
50 })
51});
52// ----- End of API routes ----- //
53
54app.listen(3000, () => {
55 console.log(`Listening on port 3000.`);
56 client.connect(err => {
57 if (err) {
58 console.log("Not connected: ", err);
59 process.exit(0);
60 }
61 console.log('Connected.');
62 });
63});

后退

监控指标

在此页面上