前言
笔者在从MongoDB 2的版本升级到MongoDB4的时候,发现驱动API修改很大,虽然仍然保留了旧的API可以使用,但是驱动不知道什么时候就会删除这些旧的API,所以使用了新的API,其中一个重要的坑是计算document的count,原来是DBCollection的count()方法,现在API变更为MongoCollection已经废弃了,笔者想当然的使用了MongoCollection的countDocuments,这里就留下了性能隐患。事实也是:MongoDB驱动从3.x升级到4.x,很多废弃的API已经删除了。
1. MongoDB 启动
笔者本地环境是mac环境,其他环境Linux同理,这里搭建一个单机MongoDB,不搭建副本集或者分片集群,MongoDB官网下载社区版
然后使用没有权限的方式启动,直接运行mongod,win环境使用bat或者cmd脚本启动
./mongod --dbpath=../data --logpath=../logs/mongod.log
直接在本机登录
使用use admin,切换admin的文档集
db.createUser({user:"账号",pwd:"密码",roles:[{"role":"userAdmin","db":"admin"},{"role":"root","db":"admin"},{"role":"userAdminAnyDatabase","db":"admin"}]})
然后使用--auth启动
./mongod --dbpath=../data --logpath=../logs/mongod.log --auth?&
至此MongoDB单节点启动OK,可以使用了。
2. demo构建
构建spring boot应用,依赖mongodb的starter
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
<version>2.5.5</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-mongodb</artifactId>
<version>2.5.5</version>
</dependency>
构建模拟数据与count API的使用demo,这里注意MongoDB的自增长_id只能识别ObjectId,ObjectId是一个12字节的BSON类型字符串,包含了UNIX时间戳,机器识别码,进程号,计数值信息。不能识别Long Integer类型的类型数据。
@Repository
public class MongoDBRepository {
@Autowired
private MongoOperations mongoOperations;
public long insertDemos(){
List<CountDemoEntity> list = new LinkedList<>();
for (int i = 0; i < 100000; i++) {
CountDemoEntity countDemoEntity = new CountDemoEntity();
countDemoEntity.setDemo("demo"+i);
countDemoEntity.setDemoName("demo-name"+i);
list.add(countDemoEntity);
if (i % 1000 == 0) {
mongoOperations.insert(list, "countDemo");
list.clear();
}
}
return 100000;
}
public long countDocuments(){
return mongoOperations.getCollection("countDemo").countDocuments();
}
public long estimatedDocumentCount(){
return mongoOperations.getCollection("countDemo").estimatedDocumentCount();
}
}
@Document(collation = "countDemo")
public class CountDemoEntity {
@Id
private ObjectId id;
private String demoName;
private String demo;
public ObjectId getId() {
return id;
}
public void setId(ObjectId id) {
this.id = id;
}
public String getDemoName() {
return demoName;
}
public void setDemoName(String demoName) {
this.demoName = demoName;
}
public String getDemo() {
return demo;
}
public void setDemo(String demo) {
this.demo = demo;
}
}
@RestController
public class MongoDBController {
@Autowired
private MongoDBRepository repository;
@RequestMapping("/insertDemos")
public String insertDemos() {
repository.insertDemos();
return "ok";
}
@RequestMapping("/countDocuments")
public long countDocuments() {
long start = System.currentTimeMillis();
long count = repository.countDocuments();
System.out.println("countDocuments = " + (System.currentTimeMillis()-start));
return count;
}
@RequestMapping("/estimatedDocumentCount")
public long estimatedDocumentCount() {
long start = System.currentTimeMillis();
long count = repository.estimatedDocumentCount();
System.out.println("estimatedDocumentCount = " + (System.currentTimeMillis()-start));
return count;
}
}
使用insert方法模拟数据
??????http://localhost:8080/insertDemos
笔者造了6138062条数据
?执行count的时间分别为
237毫秒与3364毫秒,差距实在太大了,而且随着MongoDB的存储文档的增加而递增。所以从性能的角度,推荐使用
estimatedDocumentCount
3. 为什么不推荐使用countDocuments
这其中的原因是为啥呢,需要查看源码分析
3.1 estimatedDocumentCount函数
public long estimatedDocumentCount(final EstimatedDocumentCountOptions options) {
return executeCount(null, new BsonDocument(), fromEstimatedDocumentCountOptions(options), CountStrategy.COMMAND);
}
涉及策略,estimatedDocumentCount使用count command方式,即反应在nosql语句的count函数
public enum CountStrategy {
/**
* Use the count command
*/
COMMAND,
/**
* Use the Aggregate command
*/
AGGREGATE
}
进一步跟踪,command可以看到使用count指令
发送接收消息,有点rxjava的味道,观察者模式,类似spring的事件机制
public <T> T sendAndReceive(final CommandMessage message, final Decoder<T> decoder, final SessionContext sessionContext) {
ByteBufferBsonOutput bsonOutput = new ByteBufferBsonOutput(this);
CommandEventSender commandEventSender;
try {
//组装message
message.encode(bsonOutput, sessionContext);
commandEventSender = createCommandEventSender(message, bsonOutput);
//这里如果不开启日志,就什么也不会做
commandEventSender.sendStartedEvent();
} catch (RuntimeException e) {
bsonOutput.close();
throw e;
}
try {
//发送命令消息
sendCommandMessage(message, bsonOutput, sessionContext);
if (message.isResponseExpected()) {
//取消息,拿结果
return receiveCommandMessageResponse(decoder, commandEventSender, sessionContext, 0);
} else {
commandEventSender.sendSucceededEventForOneWayCommand();
return null;
}
} catch (RuntimeException e) {
commandEventSender.sendFailedEvent(e);
throw e;
}
}
?这里就可以打印日志,网上说配置打印日志是不行的
?spring-boot需要配置
logging:
level:
org.mongodb.driver.protocol.command: DEBUG
才能打印nosql信息,实践证明确实如此
2021-10-07 21:22:55.728 DEBUG 5615 --- [nio-8080-exec-1] org.mongodb.driver.protocol.command ? ? ?: Sending command '{"count": "countDemo", "query": {}, "$db": "work", "lsid": {"id": {"$binary": {"base64": "mUhZ0SIkSYWkCSQNo6GdrA==", "subType": "04"}}}}' with request id 6 to database work on connection [connectionId{localValue:3, serverValue:81}] to server localhost:27017
?发送指令
public void sendMessage(final List<ByteBuf> byteBuffers, final int lastRequestId) {
notNull("stream is open", stream);
if (isClosed()) {
throw new MongoSocketClosedException("Cannot write to a closed stream", getServerAddress());
}
try {
stream.write(byteBuffers);
} catch (Exception e) {
close();
throw translateWriteException(e);
}
}
接收结果,有header与buffer,还可以压缩方式接收结果,降低网络IO
private ResponseBuffers receiveResponseBuffers(final int additionalTimeout) throws IOException {
ByteBuf messageHeaderBuffer = stream.read(MESSAGE_HEADER_LENGTH, additionalTimeout);
MessageHeader messageHeader;
try {
messageHeader = new MessageHeader(messageHeaderBuffer, description.getMaxMessageSize());
} finally {
messageHeaderBuffer.release();
}
ByteBuf messageBuffer = stream.read(messageHeader.getMessageLength() - MESSAGE_HEADER_LENGTH, additionalTimeout);
if (messageHeader.getOpCode() == OP_COMPRESSED.getValue()) {
CompressedHeader compressedHeader = new CompressedHeader(messageBuffer, messageHeader);
Compressor compressor = getCompressor(compressedHeader);
ByteBuf buffer = getBuffer(compressedHeader.getUncompressedSize());
compressor.uncompress(messageBuffer, buffer);
buffer.flip();
return new ResponseBuffers(new ReplyHeader(buffer, compressedHeader), buffer);
} else {
return new ResponseBuffers(new ReplyHeader(messageBuffer, messageHeader), messageBuffer);
}
}
3.2?countDocuments方法
public long countDocuments(final Bson filter, final CountOptions options) {
return executeCount(null, filter, options, CountStrategy.AGGREGATE);
}
使用aggregate的方式,通过游标的方式
?那么我们看看nosql的命令是创建的啥
BsonDocument getCommand(final SessionContext sessionContext) {
BsonDocument commandDocument = new BsonDocument("aggregate", aggregateTarget.create());
appendReadConcernToCommand(sessionContext, commandDocument);
commandDocument.put("pipeline", pipelineCreator.create());
if (maxTimeMS > 0) {
commandDocument.put("maxTimeMS", maxTimeMS > Integer.MAX_VALUE
? new BsonInt64(maxTimeMS) : new BsonInt32((int) maxTimeMS));
}
BsonDocument cursor = new BsonDocument();
if (batchSize != null) {
cursor.put("batchSize", new BsonInt32(batchSize));
}
commandDocument.put(CURSOR, cursor);
if (allowDiskUse != null) {
commandDocument.put("allowDiskUse", BsonBoolean.valueOf(allowDiskUse));
}
if (collation != null) {
commandDocument.put("collation", collation.asDocument());
}
if (comment != null) {
commandDocument.put("comment", new BsonString(comment));
}
if (hint != null) {
commandDocument.put("hint", hint);
}
return commandDocument;
}
直接查看,😖,通过$sum的方式,全表扫描累加,难怪效率低下,这种方式如果是有精确条件还好,但是全部集合执行是不行的。
后面的逻辑参考3.1上面的方式,只不过是接收游标,而且接收结果是阻塞的,直到拿到结果
跟踪看看
public boolean hasNext() {
if (closed) {
throw new IllegalStateException("Cursor has been closed");
}
if (nextBatch != null) {
return true;
}
if (limitReached()) {
return false;
}
while (serverCursor != null) {
getMore();
if (closed) {
throw new IllegalStateException("Cursor has been closed");
}
if (nextBatch != null) {
return true;
}
}
return false;
}
关键在getMore
private void getMore() {
Connection connection = connectionSource.getConnection();
try {
//3.2及以上版本,看来3.2版本改动很大
if (serverIsAtLeastVersionThreeDotTwo(connection.getDescription())) {
try {
initFromCommandResult(connection.command(namespace.getDatabaseName(),
asGetMoreCommandDocument(),
NO_OP_FIELD_NAME_VALIDATOR,
ReadPreference.primary(),
CommandResultDocumentCodec.create(decoder, "nextBatch"),
connectionSource.getSessionContext()));
} catch (MongoCommandException e) {
throw translateCommandException(e, serverCursor);
}
} else {
QueryResult<T> getMore = connection.getMore(namespace, serverCursor.getId(),
getNumberToReturn(limit, batchSize, count), decoder);
initFromQueryResult(getMore);
}
if (limitReached()) {
killCursor(connection);
}
} finally {
connection.release();
releaseConnectionSourceIfNoServerCursor();
}
}
?执行结果
同理日志也说明问题
2021-10-07 21:47:20.363 DEBUG 5615 --- [nio-8080-exec-8] org.mongodb.driver.protocol.command ? ? ?: Sending command '{"aggregate": "countDemo", "pipeline": [{"$match": {}}, {"$group": {"_id": 1, "n": {"$sum": 1}}}], "cursor": {}, "$db": "work", "lsid": {"id": {"$binary": {"base64": "mUhZ0SIkSYWkCSQNo6GdrA==", "subType": "04"}}}}' with request id 29 to database work on connection [connectionId{localValue:3, serverValue:81}] to server localhost:27017
总结
源码分析后,发现countDocuments使用sum求和的方式计数的,适合在有精确条件的情况下使用,count指令才合适集合整体计数。官方文档也有介绍:db.collection.countDocuments() — MongoDB Manual
?
?
|