一、MongoShake简介
MongoShake是阿里云以Golang语言编写的通用平台型服务工具,它通过读取MongoDB的Oplog操作日志来复制MongoDB的数据以实现特定需求。
MongoShake还提供了日志数据的订阅和消费功能,可通过SDK、Kafka、MetaQ等方式的灵活对接,适用于日志订阅、数据中心同步、Cache异步淘汰等场景。
下载地址:
软件目录结构:
[root@dn5 software]
mongoshake
├── ChangeLog
├── collector.conf
├── collector.conf.db2db
├── collector.darwin
├── collector.linux
├── collector.windows
├── comparison.py
├── diagnostic
│ └── mongoshake.testrs.journal
├── hypervisor
├── logs
│ ├── collector.log
│ ├── collector.log.1
│ ├── mongoshake.log
│ ├── mongoshake.log.1
│ ├── mongoshake.log.2
│ ├── mongoshake.log.3
│ ├── mongoshake.log.4
│ ├── mongoshake.log.5
│ ├── mongoshake.log.6
│ ├── mongoshake.log.7
│ ├── receiver.log
│ └── receiver.log.1
├── mongoshake.pid
├── mongoshake-stat
├── receiver.conf
├── receiver.darwin
├── receiver.linux
├── receiver.windows
├── start.sh
└── stop.sh
2 directories, 30 files
二、MongoShake支持的数据源
源数据库 | 目标数据库 |
---|
ECS上的自建MongoDB数据库 | ECS上的自建MongoDB数据库 | 本地自建的MongoDB数据库 | 本地自建的MongoDB数据库 | 阿里云MongoDB实例 | 阿里云MongoDB实例 | 第三方云MongoDB数据库 | 第三方云MongoDB数据库 |
重要说明 :
- 1、根据如上表格,
同步数据库的source 与 target的库类型需保持一致 。不同类型的数据源(如云MongoDB同步至本地自建MongoDB),在做数据同步时, 是不支持的,可能不成功。已知华为云MongoDB ReplicaSet集群同步至本地自建数据库会报错,报错类型: Oplog Tailer initialize failed: no oplog ns in mongo。 - 2、本地单机版MongoDB不会触发oplog的生成, 故源端MongoDB必须为集群版本.
如果是安装的单机版,则在启动mongoshake时,报如下错误:
[root@cdh-test5 mongoshake]
[2022/04/21 11:08:12 CST] [WARN]
______________________________
\ \ _ ______ |
\ \ / \___-=O'/|O'/__|
\ MongoShake, Here we go !! \_______\ / | / )
/ / '/-==__ _/__|/__=-| -GM
/ Alibaba Cloud / * \ | |
/ / (o)
------------------------------
if you have any problem, please visit https://github.com/alibaba/MongoShake/wiki/FAQ
[2022/04/21 11:08:12 CST] [INFO] New session to mongodb://127.0.0.1:40001 successfully
[2022/04/21 11:08:12 CST] [INFO] Close session with mongodb://127.0.0.1:40001
[2022/04/21 11:08:12 CST] [INFO] New session to mongodb://127.0.0.1:40001 successfully
[2022/04/21 11:08:12 CST] [CRIT] There has no oplog collection in mongo db server
[2022/04/21 11:08:12 CST] [INFO] Close session with mongodb://127.0.0.1:40001
[11:08:12 CST 2022/04/21] [CRIT] (mongoshake/collector/coordinator.(*ReplicationCoordinator).sanitizeMongoDB:134) There has no oplog collection in mongo db server
no oplog ns in mongo. See https://github.com/alibaba/MongoShake/wiki/FAQ
[2022/04/21 11:08:12 CST] [CRIT] run replication failed: no oplog ns in mongo. See https://github.com/alibaba/MongoShake/wiki/FAQ
[2022/04/21 11:08:12 CST] [WARN]
Oh we finish ?
|
|
| |
| |
| | |
|
[11:08:12 CST 2022/04/21] [CRIT] (main.startup:139) run replication failed: no oplog ns in mongo. See https://github.com/alibaba/MongoShake/wiki/FAQ
[root@cdh-test5 mongoshake]
MongoDB集群版的安装, 请参考笔者另一篇文章:
三、MongoShake初探
这里仅演示使用 MongoShake, 实现MongoDB数据实时导入Kafka中.
在 MongoShake 中实现该功能比较简单, 只需要配置 collector(控制器,实现oplog数据的连接) 与 receiver(接收器,实现oplog的重放、解析、导入到下游的sink即kafka里) 即可.
3.1 collector侧配置 & 启动
1). collector侧配置
- collector.conf
1 conf.version = 6
2
3 sync_mode=incr
4
5 mongo_urls=mongodb://111.111.111.111:40001,111.111.111.112:40002,111.111.111.113:40003
6
7 tunnel = kafka
8 tunnel.address = mongo_shake_test@dn3.test.com:9092,dn4.test.com:9092,dn5.test.com:9092
9 tunnel.message = json
10
11 incr_sync.worker = 8
12 incr_sync.mongo_fetch_method = oplog
- collector侧启动
[root@dn5 bin]
[2022/04/21 20:54:23 CST] [WARN]
______________________________
\ \ _ ______ |
\ \ / \___-=O'/|O'/__|
\ MongoShake, Here we go !! \_______\ / | / )
/ / '/-==__ _/__|/__=-| -GM
/ Alibaba Cloud / * \ | |
/ / (o)
------------------------------
if you have any problem, please visit https://github.com/alibaba/MongoShake/wiki/FAQ
[2022/04/21 20:54:23 CST] [INFO] New session to mongodb://111.111.111.111:40001 successfully
[2022/04/21 20:54:23 CST] [INFO] Close session with mongodb://111.111.111.111:40001
[2022/04/21 20:54:23 CST] [INFO] New session to mongodb://111.111.111.111:40001 successfully
[2022/04/21 20:54:23 CST] [INFO] Close session with mongodb://111.111.111.111:40001
[2022/04/21 20:54:23 CST] [INFO] Collector startup. shard_by[collection] gids[[]]
[2022/04/21 20:54:23 CST] [INFO] Collector configuration {"ConfVersion":6,"Id":"mongoshake","MasterQuorum":false,"FullSyncHTTPListenPort":9101,"IncrSyncHTTPListenPort":9100,"SystemProfilePort":9200,"LogLevel":"info","LogDirectory":"","LogFileName":"mongoshake.log","LogFlush":false,"SyncMode":"incr","MongoUrls":["mongodb://111.111.111.111:40001"],"MongoCsUrl":"","MongoSUrl":"","MongoConnectMode":"secondaryPreferred","Tunnel":"kafka","TunnelAddress":["mongo_shake_test@dn3.testhdp.com:9092,dn4.testhdp.com:9092,dn5.testhdp.com:9092"],"TunnelMessage":"json","FilterNamespaceBlack":null,"FilterNamespaceWhite":null,"FilterPassSpecialDb":null,"FilterDDLEnable":false,"CheckpointStorageUrl":"mongodb://111.111.111.111:40001","CheckpointStorageDb":"mongoshake","CheckpointStorageCollection":"ckpt_default","CheckpointStartPosition":1,"TransformNamespace":null,"FullSyncReaderCollectionParallel":6,"FullSyncReaderWriteDocumentParallel":8,"FullSyncReaderReadDocumentCount":0,"FullSyncReaderDocumentBatchSize":128,"FullSyncCollectionDrop":false,"FullSyncCreateIndex":"foreground","FullSyncReaderOplogStoreDisk":false,"FullSyncReaderOplogStoreDiskMaxSize":256000,"FullSyncExecutorInsertOnDupUpdate":false,"FullSyncExecutorFilterOrphanDocument":false,"FullSyncExecutorMajorityEnable":false,"IncrSyncMongoFetchMethod":"oplog","IncrSyncChangeStreamWatchFullDocument":false,"IncrSyncOplogGIDS":null,"IncrSyncShardKey":"collection","IncrSyncShardByObjectIdWhiteList":null,"IncrSyncWorker":8,"IncrSyncTargetDelay":0,"IncrSyncWorkerOplogCompressor":"none","IncrSyncWorkerBatchQueueSize":64,"IncrSyncAdaptiveBatchingMaxSize":1024,"IncrSyncFetcherBufferCapacity":256,"IncrSyncExecutorUpsert":false,"IncrSyncExecutorInsertOnDupUpdate":false,"IncrSyncConflictWriteTo":"none","IncrSyncExecutorMajorityEnable":false,"CheckpointStorage":"database","CheckpointInterval":5000,"FullSyncExecutorDebug":false,"IncrSyncDBRef":false,"IncrSyncExecutor":1,"IncrSyncExecutorDebug":false,"IncrSyncReaderDebug":"","IncrSyncCollisionEnable":false,"IncrSyncReaderBufferTime":1,"Version":"improve-2.4.16,7bd0a9be075037779760fe8704ea2e95d25d908b,release,go1.10.3,2020-11-04_14:33:56","IncrSyncTunnel":"","IncrSyncTunnelAddress":null,"IncrSyncTunnelMessage":"","HTTPListenPort":0,"SystemProfile":0}
[2022/04/21 20:54:23 CST] [INFO] New session to mongodb://111.111.111.111:40001 successfully
[2022/04/21 20:54:23 CST] [INFO] Close session with mongodb://111.111.111.111:40001
[2022/04/21 20:54:23 CST] [INFO] New session to mongodb://111.111.111.111:40001 successfully
[2022/04/21 20:54:23 CST] [INFO] Close session with mongodb://111.111.111.111:40001
[2022/04/21 20:54:23 CST] [INFO] all node timestamp map: map[testrs:{7088979101280632833 7089039621664800769}]
[2022/04/21 20:54:23 CST] [INFO] New session to mongodb://111.111.111.111:40001 successfully
[2022/04/21 20:54:23 CST] [INFO] testrs Regenerate checkpoint but won't persist. content: {"name":"testrs","ckpt":1,"version":2,"fetch_method":"","oplog_disk_queue":"","oplog_disk_queue_apply_finish_ts":1}
[2022/04/21 20:54:23 CST] [INFO] testrs checkpoint using mongod/replica_set: {"name":"testrs","ckpt":1,"version":2,"fetch_method":"","oplog_disk_queue":"","oplog_disk_queue_apply_finish_ts":1}, ckptRemote set? [false]
[2022/04/21 20:54:23 CST] [INFO] sync mode run incr
[2022/04/21 20:54:23 CST] [INFO] start running with mode[incr], fullBeginTs[0[0, 0]]
[2022/04/21 20:54:23 CST] [INFO] start incr replication
[2022/04/21 20:54:23 CST] [INFO] RealSourceIncrSync[0]: url[mongodb://111.111.111.111:40001], name[testrs], startTimestamp[4294967296]
[2022/04/21 20:54:23 CST] [INFO] Collector-worker-0 start working with jobs batch queue. buffer capacity 64
[2022/04/21 20:54:23 CST] [INFO] Collector-worker-1 start working with jobs batch queue. buffer capacity 64
[2022/04/21 20:54:23 CST] [INFO] Collector-worker-2 start working with jobs batch queue. buffer capacity 64
[2022/04/21 20:54:23 CST] [INFO] Collector-worker-3 start working with jobs batch queue. buffer capacity 64
[2022/04/21 20:54:23 CST] [INFO] Collector-worker-4 start working with jobs batch queue. buffer capacity 64
[2022/04/21 20:54:23 CST] [INFO] Collector-worker-5 start working with jobs batch queue. buffer capacity 64
[2022/04/21 20:54:23 CST] [INFO] Collector-worker-6 start working with jobs batch queue. buffer capacity 64
[2022/04/21 20:54:23 CST] [INFO] Collector-worker-7 start working with jobs batch queue. buffer capacity 64
[2022/04/21 20:54:23 CST] [CRIT] start incr sync server with port[9100] failed: listen tcp :9100: bind: address already in use
[2022/04/21 20:54:23 CST] [INFO] Syncer[testrs] poll oplog syncer start. ckpt_interval[5000ms], gid[[]], shard_key[collection]
[20:54:23 CST 2022/04/21] [CRIT] (mongoshake/collector/coordinator.(*ReplicationCoordinator).startOplogReplication.func1:69) start incr sync server with port[9100] failed: listen tcp :9100: bind: address already in use
[2022/04/21 20:54:23 CST] [INFO] Oplog sync[testrs] create checkpoint manager with url[mongodb://111.111.111.111:40001] table[mongoshake.ckpt_default] start-position[4294967296[1, 0]]
[2022/04/21 20:54:23 CST] [INFO] New session to mongodb://111.111.111.111:40001 successfully
[2022/04/21 20:54:23 CST] [INFO] testrs Regenerate checkpoint but won't persist. content: {"name":"testrs","ckpt":4294967296,"version":2,"fetch_method":"","oplog_disk_queue":"","oplog_disk_queue_apply_finish_ts":1}
[2022/04/21 20:54:23 CST] [INFO] load checkpoint value: {"name":"testrs","ckpt":4294967296,"version":2,"fetch_method":"","oplog_disk_queue":"","oplog_disk_queue_apply_finish_ts":1}
[2022/04/21 20:54:23 CST] [INFO] persister replset[testrs] update fetch status to: store memory and apply
[2022/04/21 20:54:23 CST] [INFO] testrs Regenerate checkpoint but won't persist. content: {"name":"testrs","ckpt":4294967296,"version":2,"fetch_method":"","oplog_disk_queue":"","oplog_disk_queue_apply_finish_ts":1}
[2022/04/21 20:54:23 CST] [INFO] set query timestamp: 4294967296[1, 0]
[2022/04/21 20:54:23 CST] [INFO] start fetcher with src[mongodb://111.111.111.111:40001] replica-name[testrs] query-ts[4294967296[1, 0]]
[2022/04/21 20:54:23 CST] [INFO] oplogReader[src:mongodb://111.111.111.111:40001 replset:testrs] ensure network
[2022/04/21 20:54:23 CST] [INFO] New session to mongodb://111.111.111.111:40001 successfully
[2022/04/21 20:54:23 CST] [WARN] oplog_reader current starting point[4294967296[1, 0]] is smaller than the oldest timestamp[7088979101280632833[1650531567, 1]]!
[2022/04/21 20:54:23 CST] [INFO] Collector-worker-3 transfer retransmit:false send [1] logs. reply_acked [7088991015519911938[1650534341, 2]], list_unack [0]
[2022/04/21 20:54:23 CST] [INFO] Collector-worker-6 transfer retransmit:false send [1] logs. reply_acked [7088984680443150338[1650532866, 2]], list_unack [0]
[2022/04/21 20:54:24 CST] [INFO] Collector-worker-6 transfer retransmit:false send [1] logs. reply_acked [7089039059024084993[1650545527, 1]], list_unack [0]
[2022/04/21 20:54:28 CST] [INFO] [name=testrs, stage=incr, get=1422, filter=1419, worker_consume=3, worker_apply=3, worker_failed_times=0, success=3, tps=0, ckpt_times=0, retransimit_times=0, tunnel_traffic=470.00B, lsn_ckpt={0[0, 0], 1970-01-01 08:00:00}, lsn_ack={7089039059024084993[1650545527, 1], 2022-04-21 20:52:07}]]
[2022/04/21 20:54:33 CST] [INFO] [name=testrs, stage=incr, get=1423, filter=1420, worker_consume=3, worker_apply=3, worker_failed_times=0, success=3, tps=0, ckpt_times=0, retransimit_times=0, tunnel_traffic=470.00B, lsn_ckpt={0[0, 0], 1970-01-01 08:00:00}, lsn_ack={7089039059024084993[1650545527, 1], 2022-04-21 20:52:07}]]
[2022/04/21 20:54:38 CST] [INFO] [name=testrs, stage=incr, get=1423, filter=1420, worker_consume=3, worker_apply=3, worker_failed_times=0, success=3, tps=0, ckpt_times=0, retransimit_times=0, tunnel_traffic=470.00B, lsn_ckpt={0[0, 0], 1970-01-01 08:00:00}, lsn_ack={7089039059024084993[1650545527, 1], 2022-04-21 20:52:07}]]
[2022/04/21 20:54:43 CST] [INFO] [name=testrs, stage=incr, get=1424, filter=1421, worker_consume=3, worker_apply=3, worker_failed_times=0, success=3, tps=0, ckpt_times=0, retransimit_times=0, tunnel_traffic=470.00B, lsn_ckpt={0[0, 0], 1970-01-01 08:00:00}, lsn_ack={7089039059024084993[1650545527, 1], 2022-04-21 20:52:07}]]
[2022/04/21 20:54:48 CST] [INFO] [name=testrs, stage=incr, get=1424, filter=1421, worker_consume=3, worker_apply=3, worker_failed_times=0, success=3, tps=0, ckpt_times=0, retransimit_times=0, tunnel_traffic=470.00B, lsn_ckpt={0[0, 0], 1970-01-01 08:00:00}, lsn_ack={7089039059024084993[1650545527, 1], 2022-04-21 20:52:07}]]
[2022/04/21 20:54:53 CST] [INFO] [name=testrs, stage=incr, get=1426, filter=1422, worker_consume=3, worker_apply=3, worker_failed_times=0, success=3, tps=0, ckpt_times=0, retransimit_times=0, tunnel_traffic=470.00B, lsn_ckpt={0[0, 0], 1970-01-01 08:00:00}, lsn_ack={7089039059024084993[1650545527, 1], 2022-04-21 20:52:07}]]
[2022/04/21 20:54:54 CST] [INFO] Syncer[testrs] batcher flushes cached oplog
[2022/04/21 20:54:54 CST] [INFO] Collector-worker-6 transfer retransmit:false send [1] logs. reply_acked [7089039767693688833[1650545692, 1]], list_unack [0]
2). receiver侧配置 & 启动
- receiver.conf
[root@dn5 mongoshake]
log.level = info
log.dir =
log.file = receiver.log
log.flush = false
system_profile_port = 9500
tunnel = kafka
tunnel.address = mongo_shake_test@dn3.test.com:9092,dn4.test.com:9092,dn5.test.com:9092
replayer = 8
- receiver侧启动
[root@dn5 mongoshake]
[2022/04/20 22:30:18 CST] [INFO] ExampleReplayer start. pending queue capacity 256
[2022/04/20 22:30:18 CST] [INFO] ExampleReplayer start. pending queue capacity 256
[2022/04/20 22:30:18 CST] [INFO] ExampleReplayer start. pending queue capacity 256
[2022/04/20 22:30:18 CST] [INFO] ExampleReplayer start. pending queue capacity 256
[2022/04/20 22:30:18 CST] [INFO] ExampleReplayer start. pending queue capacity 256
[2022/04/20 22:30:18 CST] [INFO] ExampleReplayer start. pending queue capacity 256
[2022/04/20 22:30:18 CST] [INFO] ExampleReplayer start. pending queue capacity 256
[2022/04/20 22:30:18 CST] [INFO] ExampleReplayer start. pending queue capacity 256
[2022/04/20 22:30:18 CST] [INFO] receiver is starting...
3). Kafka端监控数据变化
[root@dn3 ~]
...
{"ts":7088984680443150338,"h":536529010439892526,"v":2,"op":"i","ns":"runoob.runoob","o":[{"Name":"_id","Value":"626122023df47cb4576149bb"},{"Name":"name","Value":"David"}],"o2":null}
{"ts":7088991015519911938,"h":7122990587426851361,"v":2,"op":"i","ns":"test.runoob","o":[{"Name":"_id","Value":"626127c52a312206504466ec"},{"Name":"name","Value":"Andy"}],"o2":null}
{"ts":7088984680443150338,"h":536529010439892526,"v":2,"op":"i","ns":"runoob.runoob","o":[{"Name":"_id","Value":"626122023df47cb4576149bb"},{"Name":"name","Value":"David"}],"o2":null}
{"ts":7088991015519911938,"h":7122990587426851361,"v":2,"op":"i","ns":"test.runoob","o":[{"Name":"_id","Value":"626127c52a312206504466ec"},{"Name":"name","Value":"Andy"}],"o2":null}
{"ts":7089039059024084993,"h":6914250978284002788,"v":2,"op":"i","ns":"runoob.runoob","o":[{"Name":"_id","Value":"626153772a312206504466ed"},{"Name":"name","Value":"Echo"}],"o2":null}
{"ts":7089039767693688833,"h":9017971889662735601,"v":2,"op":"i","ns":"runoob.runoob","o":[{"Name":"_id","Value":"6261541c2a312206504466ee"},{"Name":"name","Value":"John"}],"o2":null}
{"ts":7089040119881007106,"h":4077432025564863895,"v":2,"op":"i","ns":"runoob.test","o":[{"Name":"_id","Value":"6261546e2a312206504466ef"},{"Name":"name","Value":"lily"},{"Name":"age","Value":"20"}],"o2":null}
TroubleShooting
Q: How to solve the "Oplog Tailer initialize failed" error?
A: If the error is about syncer error, please check whether source database can be connected by mongo command. If the error is about worker error, please check your tunnel configuration.
Q: How to solve the "Oplog Tailer initialize failed: no reachable servers" error?
A: First, you should check your MongoDB is reachable. If you only configure single node in your mongo_urls, this error also happens. We highly recommand to configure whole MongoDB address that includes primary, secondary and hidden no matter replicaSet or Sharding in your mongo_urls, but if you insist on doing that, please set mongo_connect_mode = standalone which has been added since v2.0.6.
Q: How to solve the "error type[*mgo.QueryError] error[no such cmd: applyOps]" error?
A: applyOps in DDL is not supported for sharding.
Q: How to solve the "Oplog Tailer initialize failed: no oplog ns in mongo" error?
A: This is usually a problem with insufficient account permissions, so, please check your permission of oplog table. If the source is sharding, the account should be added into each shard because there is no local database in mongos. When source is sharding, the mongo_urls should be the shards address split by semicolon(;) like: mongo_urls: mongodb://user1:passwd1@10.1.1.1:20011,10.1.1.2:20112;mongodb://user2:passwd2@10.1.2.1:20011,10.1.2.2:20112. Since v2.0.6, MongoShake doesn't throw this error when sync mode is full sync(sync_mode = document).
参考列表:
|