- 官方文档地址: https://elasticsearch-dsl.readthedocs.io/en/latest/configuration.html
- kabana线上地址:xxxxx
- 线上测试接口:xxxxx
- 线上测试文档:xxxxxx
一: 连接配置
-
1: 基本的连接配置: (默认起了一个名字:default ) from elasticsearch_dsl import connections
connections.create_connection(hosts['xxx'],timeout=20)
-
2:连接起别名: 如果起别名了,则查询对象声明的时候,必须声明这个连接才能使用,否则使用的是默认的“default”。 from elasticsearch_dsl import connections
connections.create_connection(
alias="my_connection",
hosts=['xxx'],
timeout=20)
-
3: 权限验证:
- 有些线上的环境,我们需要权限认证登录,则有特殊的写法:
hosts = ["http://username:password@10.73.18.240:9200"]
二: 查询操作
1: 简单的查询操作:
- 1: 实例化Search对象:
- s = Search(Using = ”连接名“, index=“索引名”)
from elasticsearch_dsl import connections, Search
connections.create_connection(
alias="my_connection",
hosts=['xxx'],
timeout=20)
s = Search(using="my_connection", index="xxxxx")
- 2:简单的查询操作:
- search.query(”匹配方式“, 字段的key=字段的值).execute()
- 得到的response是个Response对象,这个对象类似于列表。
- 遍历这个列表,得到的是Hit对象,这个对象类似于字典。可以直接使用[]方式取值。
- Hit对象.to_dict()可以直接转换成字典对象。
from elasticsearch_dsl import connections, Search
connections.create_connection(hosts=hosts, timeout=timeout, alias=alias)
search = Search(using=alias, index="xxxxxx")
response = search.query("match", lc_no="LC20072413391922").execute()
for res in response:
print(res['status'], res['request_time'])
for res in response:
my_dict = res.to_dict()
print(my_dict)
- 3: 基于查询进行删除操作:
- search.delete()即可。
- (由于线上环境,不方便演示)
2: 普通查询:
-
1: Match:模糊匹配
- 如果该字段可以被IK中文分词,则匹配被分词后的所有数据。
- 如果该字段不能被分词,则等效于等值匹配。
- query()中可以填入一个Match来进行查询操作。
from elasticsearch_dsl import connections, Search
from elasticsearch_dsl.query import Match
connections.create_connection(hosts=hosts, timeout=timeout, alias=alias)
search = Search(using=alias, index="xxxxx")
response = search.query(Match(request_id={"query": "a088d751"}))
for res in response:
print(res.to_dict().get('request_id'))
-
2: MultiMatch: 模糊查询,可以匹配多列。
connections.create_connection(hosts=hosts, timeout=timeout, alias=alias)
search = Search(using=alias, index="xxxxxxx")
response = search.query(MultiMatch(query="a088d751", fields=['request_id', ]))
for res in response:
print(res.to_dict().get('request_id'))
-
3: Q对象的使用:
- 我们还可以直接将Q对象放入query()中进行查询:
- from elasticsearch_dsl.query import Q
- Q对象可以直接包含一个DSL语句,也可以参数方式构造。
from elasticsearch_dsl import connections, Search
from elasticsearch_dsl.query import Q
connections.create_connection(hosts=hosts, timeout=timeout, alias=alias)
search = Search(using=alias, index="xxxxxxx")
response = search.query(Q("multi_match", query="a088d751", fields=['request_id', ]))
for res in response:
print(res.to_dict().get('request_id'))
response2 = search.query(Q({"multi_match": {"query": "a088d751", "fields": ["request_id", ]}}))
for res in response2:
print(res.to_dict().get('request_id'))
- query()还支持省略Q对象,而是直接将Q中的内容直接写入的方式。
response = search.query("multi_match", query="a088d751", fields=['request_id', ])
for res in response:
print(res.to_dict().get('request_id'))
response2 = search.query({"multi_match": {"query": "a088d751", "fields": ["request_id", ]}})
for res in response2:
print(res.to_dict().get('request_id'))
response2 = search.query(Q('bool', must=[Q('match', request_id='a088d751'), Q('match', lc_no='LC20072413391922')]))
for res in response2:
print(res.to_dict().get('request_id'))
print(res.to_dict().get('lc_no'))
3: 组合查询:
-
1: 使用Q对象进行组合查询:
response2 = search.query(Q("match", request_id='a088d751') | Q("match", request_id="4f8e648a"))
for res in response2:
print(res.to_dict().get('request_id'))
-
2: 查询连接查询的方式:
response2 = search.query("match", request_id="4f8e648a").query("match", lc_no="LC21081909540613")
for res in response2:
print(res.to_dict().get('request_id'))
-
3: Q对象构造复杂查询: q = Q('bool',
must=[Q('match', title='python')],
should=[Q(...), Q(...)],
minimum_should_match=1
)
s = Search().query(q)
三: 过滤
-
案例一: 查询2021年8月25日0时刻到2021年8月26日零时刻,设备号为QbJK/5PqQ的所有记录。(结果是30条记录)
response = search.query("term", serial="QbJK/5PqQ").filter("range",
request_time={"gte": "2021-08-25T00:00:00+0800",
"lt": "2021-08-26T00:00:10+0800"}
).params(size=10000)
for hit in response:
print(hit.to_dict().get('serial'), hit.to_dict().get('request_time'))
-
案例二: 查询2021年8月25日0时刻到2021年8月26日零时刻,设备号为QtAVdFXBb和QbJK/zLIw的所有记录数量(时间闭区间下: 数量是385, 左闭右开下是:383)
- 注意: 使用count()就不能使用params()限制数量了。
- 等值查询单个用term, 多个值用terms。
connections.create_connection(hosts=settings.ES_HOST, timeout=settings.ES_TIME_OUT, alias=settings.ES_ALIAS)
search = Search(using=settings.ES_ALIAS, index="xxxxxxxx")
counts = search.query("terms", serial=["QtAVdFXBb", "QbJK/zLIw"]).filter("range",
request_time={
"gte": "2021-08-25T00:00:00+0800",
"lte": "2021-08-26T00:00:00+0800"}
).count()
print(counts)
四: 聚合:
- 1: 聚合定义:
- 2: 聚合嵌套:
- bucket(): 满足特定条件的文档的集合。
- metric(): 对桶内的文档进行统计计算(例如最小值,求和,最大值等)。
- pipline(): 管道
- 3: 聚合添加到search对象:
4.1: 聚合案例:
-
测试数据格式: 会有一个汽车数据文档,文档属性有4个: 价格,颜色,
price | color | make | sold |
---|
interger | text | text | date | | fielddata | fieddata | format: “yyyy-MM-dd” |
-
1: 统计哪种颜色汽车的销量最好
a = A("terms", field="color")
search.aggs.bucket("popular_color", a)
search.execute()
-
2: 统计每种颜色车的平均价格:
- 思路: 根据颜色聚合,聚合完对每种颜色中的价格取平均。
a1 = A("terms", field="color")
a2 = A("avg", filed="price")
search.aggs.bucket("colors", a1).metric("avg_price", a2)
search.execute()
-
3: 统计每种颜色中的每种品牌的平均价格:
- 思路:先根据颜色聚合,再根据品牌聚合,然后对每个品牌取均价。
search.aggs.bucket("colors", "terms", field="color")
search.aggs['colors'].bucket("make", "terms", filed = "make")
search.aggs['colors'].aggs["make"].mertric("avg_price", "avg", field="price")
search.execute()
-
4:统计每种品牌每种颜色的最高价格和最低价格 search.aggs.bucket("colors", "terms", field="color")
search.aggs["colors"].bucket("make", "terms", field="make")
search.aggs["colors"].aggs["make"].metric("min_price", "min", field="price")
search.aggs["colors"].aggs["make"].metric("max_price", "max", field="price")
search.execute()
五: 排序:
-
1: sort()中直接写一个列名,表示对这个列正序排列。 -
2: sort()中写一个列名前面加一个负号,表示对这个列逆序排列。 -
3: 注意:进行排序验证打印的时候,最好用time.sleep休眠一下,不然终端还是显示乱序的。 -
4 : 案例: 查询2021年8月25日0时刻到2021年8月26日零时刻,设备号为QbJK/5PqQ的所有记录, 按照时间从小到大排列: response = search.query("range",
request_time={"gte": "2021-08-26T00:00:00+0800",
"lt": "2021-08-27T00:00:00+0800"}
).query("term", serial="QbJK/5PqQ").params(size=10000).sort("request_time")
for hit in response:
data = hit.to_dict()
print(data.get("serial"), data.get('request_time'))
time.sleep(1)
-
5: 案例: 查询2021年8月25日0时刻到2021年8月26日零时刻,设备号为QbJK/5PqQ的所有记录, 按照时间从大到小排列: response = search.query("range",
request_time={"gte": "2021-08-26T00:00:00+0800",
"lt": "2021-08-27T00:00:00+0800"}
).query("term", serial="QbJK/5PqQ").params(size=10000).sort("-request_time")
for hit in response:
data = hit.to_dict()
print(data.get("serial"), data.get('request_time'))
time.sleep(1)
六: 分页:
-
1: 直接使用切片: connections.create_connection(hosts=settings.ES_HOST, timeout=settings.ES_TIME_OUT, alias=settings.ES_ALIAS)
search = Search(using=settings.ES_ALIAS, index="xxxxxx")
response = search.query()[10:11]
for hit in response:
print(hit.to_dict().get("product_id"))
-
2: 如果要获取拿到的所有的数据:可以使用scan:
connections.create_connection(hosts=settings.ES_HOST, timeout=settings.ES_TIME_OUT, alias=settings.ES_ALIAS)
search = Search(using=settings.ES_ALIAS, index="xxxxxx")
response = search.query()
for hit in response.scan():
print(hit.to_dict().get("product_id"))
|