IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 大数据 -> 不等连接的小应用 -> 正文阅读

[大数据]不等连接的小应用

不等连接的小应用

有时候我面试程序员,会问一个问题:一个关系型数据库中的表,它的主键是自增id,我删掉了一行,这一行不在开头和末尾。其它信息未知的情况下,能否用一条查询找到被删掉的id?

这里其实有一个很简单的解法:

select l.id+1 from data as l left join data as r on l.id+1 = r.id where r.id is null;

它的思路是利用自增数列的特性,用简单的数值计算构造出两个错位的数列,然后找出缺失的位置。当然,这里面还包含了末尾的一行数据,不过这个题我一般会强调是用在数据清洗之类的操作性任务中,并非自动化的程序逻辑,所以人工看一眼就能排查掉不必要的数据。我最早遇到这类问题,就是在早年出现场为客户做数据清洗和维护的过程中。这类不严谨但是方便的数据查询技巧,其实可以给工程师的日常工作带来很多便利。

周末我临时发现我手头的一个数据集内容有问题,需要重新整理,数据量不大不小,几十个GB,两百多万行,在我用来导数据的旧电脑上,要花几个小时,所以我为了查看进度和分析优化,在数据库中加了一个表:

create log(
	id serial primary key,
	ts timestamp default now(),
	line_number integer
);
create index on log(ds desc);

我的数据处理程序每一千条写入一次数据库,于是我就在每次写入的时候,让它顺便在 log 表里写入最后一行的行号:

      val source = Source.fromIterator(() => lines.getLines())
      val action = source
        .map({ line:String =>
          val number = counter.incrementAndGet()
          (number, line)
        })
        .drop(skip)
        .mapAsync(parallels)(tuple => Future(decode(tuple._1, tuple._2)))
        .filter(_.isDefined).map(_.get)
        .grouped(batchSize)
        .mapAsync(parallels)({ articles =>
          val max = articles.map(_.line).max
          val min = articles.map(_.line).min
		  val line_number = max
...
          val count = sql"insert into log(line_number) select ${line_number}"
          db.run(DBIO.seq(actions, count.asUpdate)).recover({
            case err: BatchUpdateException =>
              println(f"save batch failed $err at line in [${min}, ${max}]")
            case err: Exception =>
              println(f"catch a unknown exception when batch save $err in lines [$min, $max]")
          })
        })

      val workflow = action.runWith(Sink.ignore)

      Await.ready(workflow, Duration.Inf)

这个程序并不严谨,它没有用log4j正规的输出日志,我直接将程序运行在 tmux 里,用 shell 重定向 stdout 到一个文件中,然后 detach 后在终端 tail -f。而要查看处理速度和进度,则在另一个终端里,用 psql 连上数据库,执行:

with t as (select r.ts, (r.line_number - line_number) as processed, extract(epoch from  r.ts - l.ts) as seconds, line_number 
	from log as l join log as r on l.id +1 = r.id 
where l.ts > now()-'120 seconds'::interval) select ts, processed, seconds, case when seconds > 0 then processed/seconds else null end as "per senconds", amount from t order by ts;

它的输出结果大概是:

             ts             | processed |  seconds  |    per senconds    | amount
----------------------------+-----------+-----------+--------------------+---------
 2021-09-12 15:33:30 |      1000 |  9.07 | 110.15 | 1760227
 2021-09-12 15:33:43 |      1000 | 13.23 |  75.5515 | 1761227
...
 (15 rows)

这样一个结构。是两分钟内每一批次提交的记录中,后一批的最后一行行号,减前一批,这个数据并不算很严谨,因为 map async 会导致速度快的时候,可能顺序是混乱的。但是比起select count(*) from?的代价要小的多,更重要的是我其实当时想要把数据清洗阶段 fliter 掉的数据也计算在内。

当然,要做严谨准确的话,仍然有很多值得打磨的地方。比如我们也许可以借助递归查询写出更简洁干净的不等连接。但是我当时就是想快速的写出一个程序运行,这个方案是最不费脑子的。过程中用到了 scala,slick 、akka stream 和 postgresql,达到了一个比较满意的结果,我没有很费力,我的老电脑也没有运行的很低效。并且最终我可以看到每一个出错被跳过的位置在哪里,必要的时候,可以逐个分析处理。

  大数据 最新文章
实现Kafka至少消费一次
亚马逊云科技:还在苦于ETL?Zero ETL的时代
初探MapReduce
【SpringBoot框架篇】32.基于注解+redis实现
Elasticsearch:如何减少 Elasticsearch 集
Go redis操作
Redis面试题
专题五 Redis高并发场景
基于GBase8s和Calcite的多数据源查询
Redis——底层数据结构原理
上一篇文章      下一篇文章      查看所有文章
加:2021-09-14 13:24:52  更:2021-09-14 13:24:59 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2025年1日历 -2025/1/18 12:58:14-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码