不等连接的小应用
有时候我面试程序员,会问一个问题:一个关系型数据库中的表,它的主键是自增id,我删掉了一行,这一行不在开头和末尾。其它信息未知的情况下,能否用一条查询找到被删掉的id?
这里其实有一个很简单的解法:
select l.id+1 from data as l left join data as r on l.id+1 = r.id where r.id is null;
它的思路是利用自增数列的特性,用简单的数值计算构造出两个错位的数列,然后找出缺失的位置。当然,这里面还包含了末尾的一行数据,不过这个题我一般会强调是用在数据清洗之类的操作性任务中,并非自动化的程序逻辑,所以人工看一眼就能排查掉不必要的数据。我最早遇到这类问题,就是在早年出现场为客户做数据清洗和维护的过程中。这类不严谨但是方便的数据查询技巧,其实可以给工程师的日常工作带来很多便利。
周末我临时发现我手头的一个数据集内容有问题,需要重新整理,数据量不大不小,几十个GB,两百多万行,在我用来导数据的旧电脑上,要花几个小时,所以我为了查看进度和分析优化,在数据库中加了一个表:
create log(
id serial primary key,
ts timestamp default now(),
line_number integer
);
create index on log(ds desc);
我的数据处理程序每一千条写入一次数据库,于是我就在每次写入的时候,让它顺便在 log 表里写入最后一行的行号:
val source = Source.fromIterator(() => lines.getLines())
val action = source
.map({ line:String =>
val number = counter.incrementAndGet()
(number, line)
})
.drop(skip)
.mapAsync(parallels)(tuple => Future(decode(tuple._1, tuple._2)))
.filter(_.isDefined).map(_.get)
.grouped(batchSize)
.mapAsync(parallels)({ articles =>
val max = articles.map(_.line).max
val min = articles.map(_.line).min
val line_number = max
...
val count = sql"insert into log(line_number) select ${line_number}"
db.run(DBIO.seq(actions, count.asUpdate)).recover({
case err: BatchUpdateException =>
println(f"save batch failed $err at line in [${min}, ${max}]")
case err: Exception =>
println(f"catch a unknown exception when batch save $err in lines [$min, $max]")
})
})
val workflow = action.runWith(Sink.ignore)
Await.ready(workflow, Duration.Inf)
这个程序并不严谨,它没有用log4j正规的输出日志,我直接将程序运行在 tmux 里,用 shell 重定向 stdout 到一个文件中,然后 detach 后在终端 tail -f。而要查看处理速度和进度,则在另一个终端里,用 psql 连上数据库,执行:
with t as (select r.ts, (r.line_number - line_number) as processed, extract(epoch from r.ts - l.ts) as seconds, line_number
from log as l join log as r on l.id +1 = r.id
where l.ts > now()-'120 seconds'::interval) select ts, processed, seconds, case when seconds > 0 then processed/seconds else null end as "per senconds", amount from t order by ts;
它的输出结果大概是:
ts | processed | seconds | per senconds | amount
----------------------------+-----------+-----------+--------------------+---------
2021-09-12 15:33:30 | 1000 | 9.07 | 110.15 | 1760227
2021-09-12 15:33:43 | 1000 | 13.23 | 75.5515 | 1761227
...
(15 rows)
这样一个结构。是两分钟内每一批次提交的记录中,后一批的最后一行行号,减前一批,这个数据并不算很严谨,因为 map async 会导致速度快的时候,可能顺序是混乱的。但是比起select count(*) from? 的代价要小的多,更重要的是我其实当时想要把数据清洗阶段 fliter 掉的数据也计算在内。
当然,要做严谨准确的话,仍然有很多值得打磨的地方。比如我们也许可以借助递归查询写出更简洁干净的不等连接。但是我当时就是想快速的写出一个程序运行,这个方案是最不费脑子的。过程中用到了 scala,slick 、akka stream 和 postgresql,达到了一个比较满意的结果,我没有很费力,我的老电脑也没有运行的很低效。并且最终我可以看到每一个出错被跳过的位置在哪里,必要的时候,可以逐个分析处理。
|