一:创建db表并插入数据:
CREATE TABLE `position` (
`companyName` varchar(300) DEFAULT NULL,
`id` double DEFAULT NULL,
`positionAdvantage` varchar(300) DEFAULT NULL,
`companyId` double DEFAULT NULL,
`positionName` varchar(240) DEFAULT NULL,
`salary` varchar(120) DEFAULT NULL,
`salaryMin` double DEFAULT NULL,
`salaryMax` double DEFAULT NULL,
`salaryMonth` double DEFAULT NULL,
`education` varchar(60) DEFAULT NULL,
`workYear` varchar(60) DEFAULT NULL,
`jobNature` varchar(120) DEFAULT NULL,
`chargeField` blob,
`createTime` datetime DEFAULT NULL,
`email` varchar(300) DEFAULT NULL,
`publishTime` varchar(150) DEFAULT NULL,
`isEnable` double DEFAULT NULL,
`isIndex` double DEFAULT NULL,
`city` varchar(150) DEFAULT NULL,
`orderby` double DEFAULT NULL,
`isAdvice` double DEFAULT NULL,
`showorder` double DEFAULT NULL,
`publishUserId` double DEFAULT NULL,
`workAddress` varchar(300) DEFAULT NULL,
`generateTime` datetime DEFAULT NULL,
`bornTime` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
`isReward` double DEFAULT NULL,
`rewardMoney` varchar(60) DEFAULT NULL,
`isExpired` double DEFAULT NULL,
`positionDetailPV` double DEFAULT NULL,
`offlineTime` datetime DEFAULT NULL,
`positionDetailPV_cnbeta` double DEFAULT NULL,
`adviceTime` datetime DEFAULT NULL,
`comeFrom` varchar(150) DEFAULT NULL,
`receivedResumeCount` double DEFAULT NULL,
`refuseResumeCount` double DEFAULT NULL,
`markCanInterviewCount` double DEFAULT NULL,
`haveNoticeInterCount` double DEFAULT NULL,
`isForbidden` double DEFAULT NULL,
`reason` varchar(768) DEFAULT NULL,
`verifyTime` datetime DEFAULT NULL,
`adWord` double DEFAULT NULL,
`adRankAndTime` varchar(120) DEFAULT NULL,
`adTimes` double DEFAULT NULL,
`adStartTime` datetime DEFAULT NULL,
`adEndTime` datetime DEFAULT NULL,
`adBeforeDetailPV` double DEFAULT NULL,
`adAfterDetailPV` double DEFAULT NULL,
`adBeforeReceivedCount` double DEFAULT NULL,
`adAfterReceivedCount` double DEFAULT NULL,
`adjustScore` double DEFAULT NULL,
`weightStartTime` datetime DEFAULT NULL,
`weightEndTime` datetime DEFAULT NULL,
`isForward` bit(1) DEFAULT NULL,
`forwardEmail` varchar(300) DEFAULT NULL,
`isSchoolJob` bit(1) DEFAULT NULL,
`type` tinyint(4) DEFAULT NULL,
`prolong_offline_time` datetime DEFAULT NULL
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
二:pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.1.0.RELEASE</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<groupId>com.lagou</groupId>
<artifactId>lagou-es-project</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>lagou-es-project</name>
<description>Demo project for Spring Boot</description>
<properties>
<elasticsearch.version>7.3.0</elasticsearch.version>
</properties>
<dependencies>
<dependency>
<groupId>org.elasticsearch.client</groupId>
<artifactId>elasticsearch-rest-high-level-client</artifactId>
<version>${elasticsearch.version}</version>
</dependency> <!-- https://mvnrepository.com/artifact/org.elasticsearch/elasticsearch -->
<dependency>
<groupId>org.elasticsearch</groupId>
<artifactId>elasticsearch</artifactId>
<version>${elasticsearch.version}</version>
</dependency>
<dependency>
<groupId>org.elasticsearch.client</groupId>
<artifactId>elasticsearch-rest-high-level-client</artifactId>
<version>7.3.0</version>
<exclusions>
<exclusion>
<groupId>org.elasticsearch</groupId>
<artifactId>elasticsearch</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.elasticsearch</groupId>
<artifactId>elasticsearch</artifactId>
<version>7.3.0</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-thymeleaf</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-devtools</artifactId>
<scope>runtime</scope>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-configuration-processor</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
<exclusions>
<exclusion>
<groupId>org.junit.vintage</groupId>
<artifactId>junit-vintage-engine</artifactId>
</exclusion>
</exclusions>
</dependency> <!-- HttpClient -->
<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpclient</artifactId>
<version>4.5.3</version>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.58</version>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<scope>runtime</scope>
</dependency> <!-- https://mvnrepository.com/artifact/org.apache.commons/commons-lang3 -->
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
<version>3.9</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.12</version>
<scope>test</scope>
</dependency> <!--devtools热部署-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-devtools</artifactId>
<optional>true</optional>
<scope>true</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
三:application.yml 文件
spring:
devtools:
restart:
enabled: true #设置开启热部署
additional-paths: src/main/java #重启目录
exclude: WEB-INF/**
freemarker:
cache: false #页面不加载缓存,修改即时生效
elasticsearch:
rest:
uris: 192.168.211.136:9200,192.168.211.136:9201,192.168.211.136:9202
server:
port: 8083
logging:
level:
root: info
com.xdclass.search: debug
四:model
package com.es.model;
import com.alibaba.fastjson.annotation.JSONType;
import com.fasterxml.jackson.annotation.JsonFormat;
import com.fasterxml.jackson.annotation.JsonIgnore;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.springframework.format.annotation.DateTimeFormat;
import java.util.Date;
@Data
@NoArgsConstructor
@AllArgsConstructor
public class Position {
//主键
private String id;
//公司名称
private String companyName;
//职位名称
private String positionName;
//职位诱惑
private String positionAdvantage;
//薪资
private String salary;
//薪资下限
private int salaryMin;
//薪资上限
private int salaryMax;
//学历
private String education;
//工作年限
private String workYear;
//发布时间
private String publishTime;
//工作城市
private String city;
//工作地点
private String workAddress;
// 发布时间
@JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss")
private Date createTime;
// 工作模式
private String jobNature;
}
五:ES配置类
package com.es.config;
import org.apache.http.HttpHost;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestHighLevelClient;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class EsConfig {
@Value("${spring.elasticsearch.rest.uris}")
private String hostlist;
@Bean
public RestHighLevelClient client() {
//解析hostlist配置信息
String[] split = hostlist.split(",");
//创建HttpHost数组,其中存放es主机和端口的配置信息
HttpHost[] httpHostArray = new HttpHost[split.length];
for (int i = 0; i < split.length; i++) {
String item = split[i];
System.out.println(item);
httpHostArray[i] = new HttpHost(item.split(":")[0], Integer.parseInt(item.split(":")[1]), "http");
}
//创建RestHighLevelClient客户端
return new RestHighLevelClient(RestClient.builder(httpHostArray));
}
}
六:mysql连接工具
package com.es.util;
import java.sql.Connection;
import java.sql.DriverManager;
public class DBHelper {
public static final String url = "jdbc:mysql://192.168.211.136:3306/lagou_position? useUnicode=true&characterEncoding=utf-8&serverTimezone=Asia/Shanghai";
public static final String name = "com.mysql.cj.jdbc.Driver";
public static final String user = "root";
public static final String password = "123456";
public static Connection conn = null;
public static Connection getConn() {
try {
Class.forName(name);
conn = DriverManager.getConnection(url, user, password);//获取连接
} catch (Exception e) {
e.printStackTrace();
}
return conn;
}
}
七:接口和实现
public interface PositionService {
/*** 分页查询
* @param keyword * @param pageNo * @param pageSize * @return */
public List<Map<String, Object>> searchPos(String keyword, int pageNo, int pageSize) throws IOException;
/*** 导入数据 */
void importAll() throws IOException;
}
package com.lagou.es.service.impl;
import com.es.config.EsConfig;
import com.es.service.PositionService;
import com.es.util.DBHelper;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.bulk.BackoffPolicy;
import org.elasticsearch.action.bulk.BulkProcessor;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import java.io.IOException;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.ResultSetMetaData;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.function.BiConsumer;
@Service
public class PositionServiceImpl implements PositionService {
private static final Logger logger = LogManager.getLogger(PositionServiceImpl.class);
@Autowired
private RestHighLevelClient client;
private static final String POSITIOIN_INDEX = "position";
//查找职位
public List<Map<String, Object>> searchPos(String keyword, int pageNo, int pageSize) throws IOException {
if (pageNo <= 1) {
pageNo = 1;
}
//getPosition(keyword);
// 条件搜索
SearchRequest searchRequest = new SearchRequest(POSITIOIN_INDEX);
SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
//分页 index = (当前页-1)*一页显示条数
searchSourceBuilder.from((pageNo - 1) * pageSize);
searchSourceBuilder.size(pageSize);
//精准匹配
// TermQueryBuilder termQueryBuilder = QueryBuilders.termQuery("positionName",keyword);
// searchSourceBuilder.query(termQueryBuilder);
QueryBuilder builder = QueryBuilders.matchQuery("positionName", keyword);
searchSourceBuilder.query(builder);
searchSourceBuilder.timeout(new TimeValue(60, TimeUnit.SECONDS));
//执行搜索
searchRequest.source(searchSourceBuilder);
SearchResponse searchResponse = client.search(searchRequest, RequestOptions.DEFAULT);
ArrayList<Map<String, Object>> list = new ArrayList<>();
SearchHit[] hits = searchResponse.getHits().getHits();
System.out.println(hits.length);
for (SearchHit hit : hits) {
list.add(hit.getSourceAsMap());
}
return list;
}
@Override
public void importAll() throws IOException {
writeMysqlDataToES(POSITIOIN_INDEX);
}
/**
* 讲数据批量写入ES中
*/
private void writeMysqlDataToES(String tableName) {
BulkProcessor bulkProcessor = getBulkProcessor(client);
Connection conn = null;
PreparedStatement ps = null;
ResultSet rs = null;
try {
conn = DBHelper.getConn();
logger.info("Start handle data :" + tableName);
String sql = "SELECT * from " + tableName;
ps = conn.prepareStatement(sql, ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY);
// 根据自己需要 设置
ps.setFetchSize(20);
rs = ps.executeQuery();
ResultSetMetaData colData = rs.getMetaData();
ArrayList<HashMap<String, String>> dataList = new ArrayList<HashMap<String, String>>();
// bulkProcessor 添加的数据支持的方式并不多,查看其api发现其支持map键值对的 方式,故笔者在此将查出来的数据转换成hashMap方式
HashMap<String, String> map = null;
int count = 0;
String c = null;
String v = null;
while (rs.next()) {
count++;
map = new HashMap<String, String>(128);
for (int i = 1; i <= colData.getColumnCount(); i++) {
c = colData.getColumnName(i);
v = rs.getString(c);
map.put(c, v);
}
dataList.add(map);
// 每1万条写一次,不足的批次的最后再一并提交
if (count % 10000 == 0) {
logger.info("Mysql handle data number : " + count);
// 将数据添加到 bulkProcessor 中
for (HashMap<String, String> hashMap2 : dataList) {
bulkProcessor.add(new IndexRequest(POSITIOIN_INDEX).source(hashMap2));
}
// 每提交一次便将map与list清空
map.clear();
dataList.clear();
}
}
// 处理未提交的数据
for (HashMap<String, String> hashMap2 : dataList) {
bulkProcessor.add(new IndexRequest(POSITIOIN_INDEX).source(hashMap2));
System.out.println(hashMap2);
}
logger.info("-------------------------- Finally insert number total : " + count);
// 将数据刷新到es, 注意这一步执行后并不会立即生效,取决于bulkProcessor设置的 刷新时间
bulkProcessor.flush();
} catch (Exception e) {
logger.error(e.getMessage());
} finally {
try {
rs.close();
ps.close();
conn.close();
boolean terminatedFlag = bulkProcessor.awaitClose(150L, TimeUnit.SECONDS);
logger.info(terminatedFlag);
} catch (Exception e) {
logger.error(e.getMessage());
}
}
}
private BulkProcessor getBulkProcessor(RestHighLevelClient client) {
BulkProcessor bulkProcessor = null;
try {
BulkProcessor.Listener listener = new BulkProcessor.Listener() {
@Override
public void beforeBulk(long executionId, BulkRequest request) {
logger.info("Try to insert data number : " + request.numberOfActions());
}
@Override
public void afterBulk(long executionId, BulkRequest request, BulkResponse response) {
logger.info("************** Success insert data number : " + request.numberOfActions() + " , id: "
+ executionId);
}
@Override
public void afterBulk(long executionId, BulkRequest request, Throwable failure) {
logger.error("Bulk is unsuccess : " + failure + ", executionId: " + executionId);
}
};
BiConsumer<BulkRequest, ActionListener<BulkResponse>> bulkConsumer = (request, bulkListener) -> client
.bulkAsync(request, RequestOptions.DEFAULT, bulkListener);
BulkProcessor.Builder builder = BulkProcessor.builder(bulkConsumer, listener);
builder.setBulkActions(5000);
builder.setBulkSize(new ByteSizeValue(100L, ByteSizeUnit.MB));
builder.setConcurrentRequests(10);
builder.setFlushInterval(TimeValue.timeValueSeconds(100L));
builder.setBackoffPolicy(BackoffPolicy.constantBackoff(TimeValue.timeValueSecon ds(1L), 3));
// 注意点:让参数设置生效
bulkProcessor = builder.build();
} catch (Exception e) {
e.printStackTrace();
try {
bulkProcessor.awaitClose(100L, TimeUnit.SECONDS);
} catch (Exception e1) {
logger.error(e1.getMessage());
}
}
return bulkProcessor;
}
}
BulkProcessor 官网介绍
https://www.elastic.co/guide/en/elasticsearch/client/java-api/7.3/java-docs-bulk-processor.html
?
八:Controller
package com.lagou.es.controller;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.lagou.es.service.PositionService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.ResponseBody;
import java.io.IOException;
import java.util.List;
import java.util.Map;
@Controller
public class PositionController {
@Autowired
private PositionService service;
//测试范文页面
@GetMapping({"/", "index"})
public String indexPage() {
return "index";
}
@GetMapping("/search/{keyword}/{pageNo}/{pageSize}")
@ResponseBody
public List<Map<String, Object>> searchPosition(@PathVariable("keyword") String keyword,
@PathVariable("pageNo") int pageNo, @PathVariable("pageSize") int pageSize) throws IOException {
List<Map<String, Object>> list = service.searchPos(keyword, pageNo, pageSize);
System.out.println(list);
return list;
}
@RequestMapping("/importAll")
@ResponseBody
public String importAll() {
try {
service.importAll();
} catch (IOException e) {
e.printStackTrace();
}
return "success";
}
}
?十:启动类
package com.lagou.es;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
public class SearchApplication {
public static void main(String[] args) {
SpringApplication.run(SearchApplication.class, args);
}
}
|