背景
最初研究的目的是为了解决PG - ES之间的同步,后来参考debezium 发现可以通过**逻辑复制(logical replication)**可以实现一个更通用的PG行级数据变更中间件, 通过自定义处理逻辑完成数据处理或者同步。因此你可以基于此编写一个同步到任意类型的数据库, 甚至可以做数据日志、数据缓存、实时可视化、数据仓库等等。至于为什么使用逻辑复制,各位可以自行了解逻辑复制的优势。
原理
数据库操作所产生的wal日志可以设置输出级别,将wal日志级别开放至logical ,即可监控到数据库的写入操作,再通过逻辑解析模块初步进行内容解析,再由plugin 对中间结果进行过滤和消息化拼接后, 发布到复制槽中。而我们的程序需要做的则是“监听复制槽、处理收到的消息”。
应用场景
- PG to ES
- PG to MySQL
- PG to kakfa
- 数据监控
- 消息通知
- 数据分析
代码实现及应用
Tool for synchronizing from PostgreSQL to custom handler through replication slot: https://github.com/atopx/logical
example
package main
import (
"context"
"fmt"
"github.com/jackc/pgx"
"github.com/atopx/logical/logger"
"github.com/atopx/logical/client"
"github.com/atopx/logical/model"
)
func callback(records []*model.Waldata) {
for i := 0; i < len(records); i++ {
data := records[i]
fmt.Println(*data)
model.ReleaseWaldata(data)
}
}
func main() {
cfg := pgx.ConnConfig{
Host: "127.0.0.1",
Port: 5432,
User: "itmeng",
Password: "postgres_logical",
Database: "webstore",
}
table := "book"
slot := "book_cache_slot"
c, err := client.New(cfg, table, slot, callback)
if err != nil {
logger.Panic(err.Error())
}
logger.Info("start postgresql logical replication client")
if err = c.Start(context.Background()); err != nil {
logger.Panic(err.Error())
}
}
|