Elasticsearch对并发冲突的解决
背景
一同事写多线程执行批量修改时发现,A线程对index中doc字段state 1改为2,B线程对index中doc字段type 2改为3。执行之后查询结果发现部分doc的state字段仍为1或者type仍为2,这个时候才想起了ES不支持事物 ,然后就对这一块进行一个总结。 运行环境 7.12.1
官方对于并发冲突的说明
官方全部更新说明连接处 官方部分更新说明连接处 其中对里面的描述 对Elasticsearch的doc执行修改动作,但实际上 Elasticsearch 按前述完全相同方式执行以下过程:
- 从旧文档构建 JSON
- 更改该 JSON
- 删除旧文档
- 索引一个新文档
从描述上就能很容易看到es对事物的支持不好,单个的更新也仅仅是为了节省网络资源,操作并不是原子性的。官方对于并发冲突的解决办法(乐观并发控制) 为什么选择乐观并发控制(乐观锁) https://www.elastic.co/guide/cn/elasticsearch/guide/current/version-control.html 具体的控制方法(_version 或者联合version_type=external) https://www.elastic.co/guide/cn/elasticsearch/guide/current/optimistic-concurrency-control.html Elasticsearch 是分布式的。当文档创建、更新或删除时, 新版本的文档必须复制到集群中的其他节点。Elasticsearch 也是异步和并发的,这意味着这些复制请求被并行发送,并且到达目的地时也许 顺序是乱的 。 Elasticsearch 需要一种方法确保文档的旧版本不会覆盖新的版本。
root@UCSS-SK135-sps:/
root@UCSS-SK135-sps:/
root@UCSS-SK135-sps:/
{
"_index" : "swg-20210816-01",
"_type" : "_doc",
"_id" : "99b33625-e32e-4d82-b056-419396672c0d",
"_version" : 1,
"_seq_no" : 3,
"_primary_term" : 1,
"found" : true,
"_source" : {
"serialId" : 0,
"sourceIp" : "172.18.0.101",
"detectDateTime" : "2021-08-16T06:11:06.294+0000",
"recordDateTime" : "2021-08-16T06:11:17.422+0000",
"detectTime" : "06:11:06",
"detectType" : 0,
"actionType" : 1,
"channelType" : 1,
"fullUrl" : "http://172.16.0.1/post.php",
"urlHostname" : "172.16.0.1",
"destinationUrl" : "http://172.16.0.1",
"destinationIp" : "172.16.0.1",
"port" : 80,
"method" : "GET",
"statusCode" : 200,
"contentType" : "text/html; charset=UTF-8",
"categoryTypes" : [
1291
],
"riskType" : 0,
"severityType" : 4,
"isSecure" : true,
"statusType" : 0,
"browseTime" : 20,
"hits" : 1,
"bytesUpload" : 0,
"bytesDownload" : 0,
"bytesSent" : 129,
"bytesReceived" : 2581,
"deviceId" : "2429ab75-91b4-3c43-3b08-5f395a0b3133",
"deviceName" : "SWG-SK135",
"isTruncated" : true,
"isConsolidated" : false,
"bodyBytes" : 1159,
"source" : {
"displayName" : "172.18.0.101",
"ip" : "172.18.0.101"
},
"policyUuid" : [
"aa915076-d5c2-4656-a2fe-48b56a4b1d8e"
],
"policyName" : [
"默认策略"
],
"threatType" : [
0
],
"attachments" : [
{
"filename" : "post.php",
"fileType" : 1,
"fileTypeName" : "不可识别的文件类型",
"fileSize" : 0
}
],
"hierarchy" : [
0
],
"httpReqHdrBytes" : 129,
"httpReqBodyBytes" : 0,
"httpRspHdrBytes" : 263,
"httpRspBodyBytes" : 1159,
"blockType" : 0,
"ars" : 0.0,
"mrs" : 0.0,
"ers" : 0.0,
"trs" : 0.0,
"cloudAppCategory" : 0,
"cloudAppTrustValue" : 0.0,
"cloudAppId" : 0,
"sessionStage" : [
1,
2
],
"userAgent" : "curl/7.26.0",
"riskLevel" : 0.0,
"isMobile" : false,
"networkType" : 0,
"queryType" : 2,
"quotaActionType" : 0,
"uuid" : "99b33625-e32e-4d82-b056-419396672c0d"
}
}
在修改某条数据的时候先查找到_version,在更新的时候传入该字段
root@UCSS-SK135-sps:/
{
"error": {
"root_cause": [{
"type": "action_request_validation_exception",
"reason": "Validation Failed: 1: internal versioning can not be used for optimistic concurrency control. Please use `if_seq_no` and `if_primary_term` instead;"
}],
"type": "action_request_validation_exception",
"reason": "Validation Failed: 1: internal versioning can not be used for optimistic concurrency control. Please use `if_seq_no` and `if_primary_term` instead;"
}
此刻出现错误???提示废弃让用新的,然后就去找最新的文档查看 https://www.elastic.co/guide/en/elasticsearch/reference/current/optimistic-concurrency-control.html https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-index_.html 没说不让用,严格按照它说的操作只是说了 if_seq_no和 if_primary_term参数用来并发操作的时候使用
curl -u use:passwd -H "Content-Type: application/json" -X PUT http://169.254.253.5:9200/_cluster/settings -d '{"persistent": {"action.auto_create_index": "mytest"}}'
curl -u use:passwd -H "Content-Type: application/json" -X PUT http://169.254.253.5:9200/mytest/_doc/1 -d '{"message":"mytest"}'
curl -u use:passwd http://169.254.253.5:9200/mytest/_doc/1?pretty
curl -u use:passwd -H "Content-Type: application/json" -X PUT -d '{"message":"22"}' http://169.254.253.5:9200/mytest/_doc/1?version=4&version_type=external
以旧提示让用新的方法。
root@UCSS-SK135-sps:/
{
"_index" : "mytest",
"_type" : "_doc",
"_id" : "1",
"_version" : 7,
"result" : "updated",
"_shards" : {
"total" : 2,
"successful" : 1,
"failed" : 0
},
"_seq_no" : 6,
"_primary_term" : 1
}
用新的方法尝试可以更新,感觉官网不靠谱,废弃或者替代应该声明,如果发现可以用请留言。
已经找到对应官方说明(不支持version)
https://www.elastic.co/guide/en/elasticsearch/reference/7.14/release-notes-7.0.0-beta1.html
Remove support for internal versioning for concurrency control #38254 (issue: #1078)
在release的备注里面进行了说明,但是没有在文档中移除。
上面介绍的只能处理单个DOC的并发问题,对于批量的解决办法
[https://www.elastic.co/guide/cn/elasticsearch/guide/current/denormalization-concurrency.html](https://www.elastic.co/guide/cn/elasticsearch/guide/current/concurrency-solutions.html) https://www.elastic.co/guide/cn/elasticsearch/guide/current/concurrency-solutions.html 官方提供了三种方法
以下是三个切实可行的使用 Elasticsearch 的解决方案,它们都涉及某种形式的锁:
- 全局锁 global可以随意更改是一个锁的名称
- 文档锁
- 树锁
全局锁 在最新的文档中未发现,但是依旧可以使用
root@UCSS-SK135-sps:/
{"_index":"mytest","_type":"_doc","_id":"2","_version":1,"result":"created","_shards":{"total":2,"successful":1,"failed":0},"_seq_no":7,"_primary_term":1}
root@UCSS-SK135-sps:/
{"error":{"root_cause":[{"type":"version_conflict_engine_exception","reason":"[2]: version conflict, document already exists (current version [1])","index_uuid":"gvVR71CqSHuO8hxT-Af1-g","shard":"0","index":"mytest"}],"type":"version_conflict_engine_exception","reason":"[2]: version conflict, document already exists (current version [1])","index_uuid":"gvVR71CqSHuO8hxT-Af1-g","shard":"0","index":"mytest"},"status":409}
http://169.254.253.5:9200/mytest/_doc/2/_create http://169.254.253.5:9200/{indexName}/_doc/{约定锁的ID}/_create
文档锁还有树锁应该也可以使用,不再尝试
大家可以参考的文档 https://www.cnblogs.com/huangying2124/p/12806508.html https://blog.csdn.net/u011262847/article/details/78118142 可以用elasticsearch的特性实现排他和共享锁
|