IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 大数据 -> ElasticSearch学习路线(Python) -> 正文阅读

[大数据]ElasticSearch学习路线(Python)

安装就略过了,建议安装最近的就好,网上挺多教程的。两款比较好用的插件elasticsearch-head和kibana。安装好之后学习路线如下

第一阶段:Elasticsearch官方API文档(按每天8小时来算花费一周)

链接:
2.x版本中文
7.x版本英文
这两个官方api文档结合着看,虽然2.x和7.x的有些地方不一样,但是很多底层的东西,包括原理是一致的,而且2.x是中文版的好理解。重要的部分用Postman测测学学

第二阶段:Elasticsearch-py官方API文档(按每天8小时来算花费三天)

链接:7.x
这个是Python调用Elasticsearch的接口建议在Pycharm里测试一下

第三阶段:Elasticsearch-dsl官方API文档(按每天8小时来算花费三天)

链接:elasticsearch-dsl
这个模块能够简化Elasticsearch-py中有关查询的操作,同样建议在Pycharm里测试一下

第四阶段:Elasticsearch实践(按需花费)

案例:Elasticsearch实战 | 如何从数千万手机号中识别出情侣号?
这里附上将案例中的实现方式简单转换为Python实现方式的代码

"""
https://mp.weixin.qq.com/s?__biz=MzI2NDY1MTA3OQ==&mid=2247484728&idx=1&sn=eeb76ad84c98af16fc16d6dc5d5d11af#wechat_redirect"""
from elasticsearch import Elasticsearch
from elasticsearch_dsl import Mapping
from loguru import logger

es = Elasticsearch()


def insert_data():
    data = """
        {"index": {"_id": 1}},
        {"phone_number": "13511112222"},
        {"index": {"_id": 2}},
        {"phone_number": "13611112222"},
        {"index": {"_id": 3}},
        {"phone_number": "13711112222"},
        {"index": {"_id": 4}},
        {"phone_number": "13811112222"},
        {"index": {"_id": 5}},
        {"phone_number": "13844248474"},
        {"index": {"_id": 6}},
        {"phone_number": "13866113333"},
        {"index": {"_id": 7}},
        {"phone_number": "15766113333"}
    """
    data = data.replace(',', '')
    res = es.bulk(body=data, index="phone_index")  # 批量操作
    logger.info(res)


def prepare():
    body = \
        {
            "description": "Adds insert_time timestamp to documents",
            "processors": [
                {
                    "set": {
                        "field": "_source.insert_time",
                        "value": "{{_ingest.timestamp}}"  # 时间戳
                    }
                },
                {
                    "script": {
                        "lang": "painless",
                        "source": "ctx.last_eight_number = (ctx.phone_number.substring(3,11))"
                    }
                }
            ]
        }
    # 创建一个管道
    res = es.ingest.put_pipeline(id='initialize', body=body, ignore=400)   # 400是重复创建会返回的状态码,忽视程序可以继续走下去
    logger.info(res)

    body = {

        "index_patterns": 'phone_index',
        "template": {
            "settings": {
                "number_of_replicas": 0,
                "index.default_pipeline": 'initialize',
                "index": {
                    "max_ngram_diff": "13",
                    "analysis": {
                        "analyzer": {
                            "ngram_analyzer": {
                                "tokenizer": "ngram_tokenizer"
                            }
                        },
                        "tokenizer": {
                            "ngram_tokenizer": {
                                "token_chars": [
                                    "letter",
                                    "digit"
                                ],
                                "min_gram": "1",
                                "type": "ngram",
                                "max_gram": "11"
                            }
                        }
                    }
                }
            },
            "mappings": {
                "properties": {
                    "insert_time": {
                        "type": "date"
                    },
                    "last_eight_number": {
                        "type": "keyword"
                    },
                    "phone_number": {
                        "type": "text",
                        "fields": {
                            "keyword": {
                                "type": "keyword"
                            }
                        },
                        "analyzer": "ngram_analyzer"
                    }
                }
            }
        }
    }

    # 创建一个索引模板
    res = es.indices.put_index_template(name="phone_template", body=body, ignore=400)
    logger.info(res)

    # 创建索引
    res = es.indices.create(index="phone_index", ignore=400)
    logger.info(res)
    # res = es.indices.create(index="phone_couple_index", ignore=400)
    # logger.info(res)

    # 插入数据
    insert_data()


def get_need_hits_list():
    """提取出情侣号(>=2)的手机号或对应id。"""
    body = {
        "size": 0,
        "query": {
            "range": {
                "insert_time": {
                    "gte": 1629659503000,  # 时间戳(ms)
                    "lte": 1629688618000
                }
            }
        },
        "aggs": {
            "last_aggs": {
                "terms": {
                    "field": "last_eight_number",
                    "min_doc_count": 2,
                    "size": 10,
                    "shard_size": 30
                },
                "aggs": {
                    "sub_top_hits_aggs": {
                        "top_hits": {
                            "size": 100,
                            "_source": {
                                "includes": "phone_number"
                            },
                            "sort": [
                                {
                                    "phone_number.keyword": {
                                        "order": "asc"
                                    }
                                }
                            ]
                        }
                    }
                }
            }
        }
    }
    res = es.search(body=body, index='phone_index')
    logger.info(res)
    
    # 获取满足要的id
    need_buckets_list = res['aggregations']['last_aggs']['buckets']
    i = 0
    while i < len(need_buckets_list):
        yield need_buckets_list[i]['key'], need_buckets_list[i]['sub_top_hits_aggs']['hits']['hits']
        i += 1


def create_couple_index_template():
    """给情侣号创建索引模板"""
    body = {

        "index_patterns": "phone_couple_[0-9]{8}",
        "template": {
            "settings": {
                "number_of_replicas": 0,
                "index": {
                    "max_ngram_diff": "13",
                    "analysis": {
                        "analyzer": {
                            "ngram_analyzer": {
                                "tokenizer": "ngram_tokenizer"
                            }
                        },
                        "tokenizer": {
                            "ngram_tokenizer": {
                                "token_chars": [
                                    "letter",
                                    "digit"
                                ],
                                "min_gram": "1",
                                "type": "ngram",
                                "max_gram": "11"
                            }
                        }
                    }
                }
            },
            "mappings": {
                "properties": {
                    "phone_number": {
                        "type": "text",
                        "fields": {
                            "keyword": {
                                "type": "keyword"
                            }
                        },
                        "analyzer": "ngram_analyzer"
                    }
                }
            }
        }
    }
    res = es.indices.put_index_template(name="phone_couple_template", body=body, ignore=400)
    logger.info(res)


def reindex():
    """取出的满足条件的id进行跨索引迁移。"""

    g = get_need_hits_list()
    while True:
        try:
 
            index_key, hits_list = next(g)
            ids_list = [hit['_id'] for hit in hits_list]
            
            # 创建一个新的索引
            res = es.indices.create(index=f"phone_couple_{index_key}_index", ignore=400)
            logger.info(res)
            # 索引迁移
            body = {
                "source": {
                    "index": "phone_index",
                    "query": {
                        "terms": {
                            "_id": ids_list
                        }
                    }
                },
                "dest": {
                    "index": f"phone_couple_{index_key}_index"
                }
            }
            res = es.reindex(body=body)
            logger.info(res)
        except StopIteration:
            break


if __name__ == '__main__':
    prepare()
    create_couple_index_template()
    reindex()

这个程序可以使用dsl简化,感兴趣的朋友可以使用dsl再写一遍。
然后有关时间戳有个坑,就是es默认是UTC但是中国是东八区所以会有8小时的时间差,当你发现入库时间和本地时间不一样的时候不要惊讶,查询的时候以库中的时间戳为准

  大数据 最新文章
实现Kafka至少消费一次
亚马逊云科技:还在苦于ETL?Zero ETL的时代
初探MapReduce
【SpringBoot框架篇】32.基于注解+redis实现
Elasticsearch:如何减少 Elasticsearch 集
Go redis操作
Redis面试题
专题五 Redis高并发场景
基于GBase8s和Calcite的多数据源查询
Redis——底层数据结构原理
上一篇文章      下一篇文章      查看所有文章
加:2021-08-25 12:16:45  更:2021-08-25 12:16:47 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2025年1日历 -2025/1/18 19:02:27-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码