IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 大数据 -> elasticSearch-搜索实战 -> 正文阅读

[大数据]elasticSearch-搜索实战

一:创建db表并插入数据:

CREATE TABLE `position` (
  `companyName` varchar(300) DEFAULT NULL,
  `id` double DEFAULT NULL,
  `positionAdvantage` varchar(300) DEFAULT NULL,
  `companyId` double DEFAULT NULL,
  `positionName` varchar(240) DEFAULT NULL,
  `salary` varchar(120) DEFAULT NULL,
  `salaryMin` double DEFAULT NULL,
  `salaryMax` double DEFAULT NULL,
  `salaryMonth` double DEFAULT NULL,
  `education` varchar(60) DEFAULT NULL,
  `workYear` varchar(60) DEFAULT NULL,
  `jobNature` varchar(120) DEFAULT NULL,
  `chargeField` blob,
  `createTime` datetime DEFAULT NULL,
  `email` varchar(300) DEFAULT NULL,
  `publishTime` varchar(150) DEFAULT NULL,
  `isEnable` double DEFAULT NULL,
  `isIndex` double DEFAULT NULL,
  `city` varchar(150) DEFAULT NULL,
  `orderby` double DEFAULT NULL,
  `isAdvice` double DEFAULT NULL,
  `showorder` double DEFAULT NULL,
  `publishUserId` double DEFAULT NULL,
  `workAddress` varchar(300) DEFAULT NULL,
  `generateTime` datetime DEFAULT NULL,
  `bornTime` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
  `isReward` double DEFAULT NULL,
  `rewardMoney` varchar(60) DEFAULT NULL,
  `isExpired` double DEFAULT NULL,
  `positionDetailPV` double DEFAULT NULL,
  `offlineTime` datetime DEFAULT NULL,
  `positionDetailPV_cnbeta` double DEFAULT NULL,
  `adviceTime` datetime DEFAULT NULL,
  `comeFrom` varchar(150) DEFAULT NULL,
  `receivedResumeCount` double DEFAULT NULL,
  `refuseResumeCount` double DEFAULT NULL,
  `markCanInterviewCount` double DEFAULT NULL,
  `haveNoticeInterCount` double DEFAULT NULL,
  `isForbidden` double DEFAULT NULL,
  `reason` varchar(768) DEFAULT NULL,
  `verifyTime` datetime DEFAULT NULL,
  `adWord` double DEFAULT NULL,
  `adRankAndTime` varchar(120) DEFAULT NULL,
  `adTimes` double DEFAULT NULL,
  `adStartTime` datetime DEFAULT NULL,
  `adEndTime` datetime DEFAULT NULL,
  `adBeforeDetailPV` double DEFAULT NULL,
  `adAfterDetailPV` double DEFAULT NULL,
  `adBeforeReceivedCount` double DEFAULT NULL,
  `adAfterReceivedCount` double DEFAULT NULL,
  `adjustScore` double DEFAULT NULL,
  `weightStartTime` datetime DEFAULT NULL,
  `weightEndTime` datetime DEFAULT NULL,
  `isForward` bit(1) DEFAULT NULL,
  `forwardEmail` varchar(300) DEFAULT NULL,
  `isSchoolJob` bit(1) DEFAULT NULL,
  `type` tinyint(4) DEFAULT NULL,
  `prolong_offline_time` datetime DEFAULT NULL
) ENGINE=InnoDB DEFAULT CHARSET=utf8;

二:pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.1.0.RELEASE</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>
    <groupId>com.lagou</groupId>
    <artifactId>lagou-es-project</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <name>lagou-es-project</name>
    <description>Demo project for Spring Boot</description>
    <properties>
        <elasticsearch.version>7.3.0</elasticsearch.version>
    </properties>
    <dependencies>
        <dependency>
            <groupId>org.elasticsearch.client</groupId>
            <artifactId>elasticsearch-rest-high-level-client</artifactId>
            <version>${elasticsearch.version}</version>
        </dependency> <!-- https://mvnrepository.com/artifact/org.elasticsearch/elasticsearch -->
        <dependency>
            <groupId>org.elasticsearch</groupId>
            <artifactId>elasticsearch</artifactId>
            <version>${elasticsearch.version}</version>
        </dependency>
        <dependency>
            <groupId>org.elasticsearch.client</groupId>
            <artifactId>elasticsearch-rest-high-level-client</artifactId>
            <version>7.3.0</version>
            <exclusions>
                <exclusion>
                    <groupId>org.elasticsearch</groupId>
                    <artifactId>elasticsearch</artifactId>
                </exclusion>
            </exclusions>
        </dependency>
        <dependency>
            <groupId>org.elasticsearch</groupId>
            <artifactId>elasticsearch</artifactId>
            <version>7.3.0</version>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-thymeleaf</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-devtools</artifactId>
            <scope>runtime</scope>
            <optional>true</optional>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-configuration-processor</artifactId>
            <optional>true</optional>
        </dependency>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <optional>true</optional>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
            <exclusions>
                <exclusion>
                    <groupId>org.junit.vintage</groupId>
                    <artifactId>junit-vintage-engine</artifactId>
                </exclusion>
            </exclusions>
        </dependency> <!-- HttpClient -->
        <dependency>
            <groupId>org.apache.httpcomponents</groupId>
            <artifactId>httpclient</artifactId>
            <version>4.5.3</version>
        </dependency>
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.58</version>
        </dependency>
        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <scope>runtime</scope>
        </dependency> <!-- https://mvnrepository.com/artifact/org.apache.commons/commons-lang3 -->
        <dependency>
            <groupId>org.apache.commons</groupId>
            <artifactId>commons-lang3</artifactId>
            <version>3.9</version>
        </dependency>
        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>4.12</version>
            <scope>test</scope>
        </dependency> <!--devtools热部署-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-devtools</artifactId>
            <optional>true</optional>
            <scope>true</scope>
        </dependency>
    </dependencies>
    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>
</project>

三:application.yml 文件

spring:
  devtools:
    restart:
      enabled: true #设置开启热部署 
      additional-paths: src/main/java #重启目录 
      exclude: WEB-INF/** 
    freemarker: 
      cache: false #页面不加载缓存,修改即时生效 
      
  elasticsearch: 
    rest: 
      uris: 192.168.211.136:9200,192.168.211.136:9201,192.168.211.136:9202 
server: 
  port: 8083 
  
logging: 
  level: 
    root: info 
    com.xdclass.search: debug

四:model

package com.es.model;

import com.alibaba.fastjson.annotation.JSONType;
import com.fasterxml.jackson.annotation.JsonFormat;
import com.fasterxml.jackson.annotation.JsonIgnore;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.springframework.format.annotation.DateTimeFormat;
import java.util.Date;

@Data
@NoArgsConstructor
@AllArgsConstructor
public class Position {

    //主键 
    private String id;
    //公司名称 
    private String companyName;
    //职位名称 
    private String positionName;
    //职位诱惑 
    private String positionAdvantage;
    //薪资 
    private String salary;
    //薪资下限 
    private int salaryMin;
    //薪资上限 
    private int salaryMax;
    //学历 
    private String education;
    //工作年限 
    private String workYear;
    //发布时间 
    private String publishTime;
    //工作城市 
    private String city;
    //工作地点 
    private String workAddress;
    // 发布时间 
    @JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss")
    private Date createTime;
    // 工作模式 
    private String jobNature;
}

五:ES配置类

package com.es.config;

import org.apache.http.HttpHost;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestHighLevelClient;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class EsConfig {

    @Value("${spring.elasticsearch.rest.uris}")
    private String hostlist;

    @Bean
    public RestHighLevelClient client() {
//解析hostlist配置信息 
        String[] split = hostlist.split(",");
        //创建HttpHost数组,其中存放es主机和端口的配置信息 
        HttpHost[] httpHostArray = new HttpHost[split.length];
        for (int i = 0; i < split.length; i++) {
            String item = split[i];
            System.out.println(item);
            httpHostArray[i] = new HttpHost(item.split(":")[0], Integer.parseInt(item.split(":")[1]), "http");
        }
        //创建RestHighLevelClient客户端 
        return new RestHighLevelClient(RestClient.builder(httpHostArray));
    }
}

六:mysql连接工具

package com.es.util;

import java.sql.Connection;
import java.sql.DriverManager;

public class DBHelper {

    public static final String url = "jdbc:mysql://192.168.211.136:3306/lagou_position? useUnicode=true&characterEncoding=utf-8&serverTimezone=Asia/Shanghai";
    public static final String name = "com.mysql.cj.jdbc.Driver";
    public static final String user = "root";
    public static final String password = "123456";
    public static Connection conn = null;

    public static Connection getConn() {
        try {
            Class.forName(name);
            conn = DriverManager.getConnection(url, user, password);//获取连接 
        } catch (Exception e) {
            e.printStackTrace();
        }
        return conn;
    }
}

七:接口和实现

public interface PositionService {

    /*** 分页查询
     * @param keyword * @param pageNo * @param pageSize * @return */
    public List<Map<String, Object>> searchPos(String keyword, int pageNo, int pageSize) throws IOException;

    /*** 导入数据 */
    void importAll() throws IOException;
}
package com.lagou.es.service.impl;

import com.es.config.EsConfig;
import com.es.service.PositionService;
import com.es.util.DBHelper;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.bulk.BackoffPolicy;
import org.elasticsearch.action.bulk.BulkProcessor;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import java.io.IOException;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.ResultSetMetaData;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.function.BiConsumer;

@Service
public class PositionServiceImpl implements PositionService {

    private static final Logger logger = LogManager.getLogger(PositionServiceImpl.class);
    @Autowired
    private RestHighLevelClient client;
    private static final String POSITIOIN_INDEX = "position";

    //查找职位
    public List<Map<String, Object>> searchPos(String keyword, int pageNo, int pageSize) throws IOException {
        if (pageNo <= 1) {
            pageNo = 1;
        }
        //getPosition(keyword); 
        // 条件搜索 
        SearchRequest searchRequest = new SearchRequest(POSITIOIN_INDEX);
        SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
        //分页 index = (当前页-1)*一页显示条数 
        searchSourceBuilder.from((pageNo - 1) * pageSize);
        searchSourceBuilder.size(pageSize);
        //精准匹配 
        // TermQueryBuilder termQueryBuilder = QueryBuilders.termQuery("positionName",keyword); 
        // searchSourceBuilder.query(termQueryBuilder); 
        QueryBuilder builder = QueryBuilders.matchQuery("positionName", keyword);
        searchSourceBuilder.query(builder);
        searchSourceBuilder.timeout(new TimeValue(60, TimeUnit.SECONDS));
        //执行搜索 
        searchRequest.source(searchSourceBuilder);
        SearchResponse searchResponse = client.search(searchRequest, RequestOptions.DEFAULT);
        ArrayList<Map<String, Object>> list = new ArrayList<>();
        SearchHit[] hits = searchResponse.getHits().getHits();
        System.out.println(hits.length);
        for (SearchHit hit : hits) {
            list.add(hit.getSourceAsMap());
        }
        return list;
    }


    @Override
    public void importAll() throws IOException {
        writeMysqlDataToES(POSITIOIN_INDEX);
    }


    /**
     * 讲数据批量写入ES中
     */
    private void writeMysqlDataToES(String tableName) {
        BulkProcessor bulkProcessor = getBulkProcessor(client);
        Connection conn = null;
        PreparedStatement ps = null;
        ResultSet rs = null;
        try {
            conn = DBHelper.getConn();
            logger.info("Start handle data :" + tableName);
            String sql = "SELECT * from " + tableName;
            ps = conn.prepareStatement(sql, ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY);
            // 根据自己需要 设置 
            ps.setFetchSize(20);
            rs = ps.executeQuery();
            ResultSetMetaData colData = rs.getMetaData();
            ArrayList<HashMap<String, String>> dataList = new ArrayList<HashMap<String, String>>();
            // bulkProcessor 添加的数据支持的方式并不多,查看其api发现其支持map键值对的 方式,故笔者在此将查出来的数据转换成hashMap方式 
            HashMap<String, String> map = null;
            int count = 0;
            String c = null;
            String v = null;
            while (rs.next()) {
                count++;
                map = new HashMap<String, String>(128);
                for (int i = 1; i <= colData.getColumnCount(); i++) {
                    c = colData.getColumnName(i);
                    v = rs.getString(c);
                    map.put(c, v);
                }
                dataList.add(map);
                // 每1万条写一次,不足的批次的最后再一并提交 
                if (count % 10000 == 0) {
                    logger.info("Mysql handle data number : " + count);
                    // 将数据添加到 bulkProcessor 中 
                    for (HashMap<String, String> hashMap2 : dataList) {
                        bulkProcessor.add(new IndexRequest(POSITIOIN_INDEX).source(hashMap2));
                    }
                    // 每提交一次便将map与list清空 
                    map.clear();
                    dataList.clear();
                }
            }
            // 处理未提交的数据 
            for (HashMap<String, String> hashMap2 : dataList) {
                bulkProcessor.add(new IndexRequest(POSITIOIN_INDEX).source(hashMap2));
                System.out.println(hashMap2);
            }
            logger.info("-------------------------- Finally insert number total : " + count);
            // 将数据刷新到es, 注意这一步执行后并不会立即生效,取决于bulkProcessor设置的 刷新时间 
            bulkProcessor.flush();
        } catch (Exception e) {
            logger.error(e.getMessage());
        } finally {
            try {
                rs.close();
                ps.close();
                conn.close();
                boolean terminatedFlag = bulkProcessor.awaitClose(150L, TimeUnit.SECONDS);
                logger.info(terminatedFlag);
            } catch (Exception e) {
                logger.error(e.getMessage());
            }
        }
    }


    private BulkProcessor getBulkProcessor(RestHighLevelClient client) {
        BulkProcessor bulkProcessor = null;
        try {
            BulkProcessor.Listener listener = new BulkProcessor.Listener() {
                @Override
                public void beforeBulk(long executionId, BulkRequest request) {
                    logger.info("Try to insert data number : " + request.numberOfActions());
                }

                @Override
                public void afterBulk(long executionId, BulkRequest request, BulkResponse response) {
                    logger.info("************** Success insert data number : " + request.numberOfActions() + " , id: "
                        + executionId);
                }

                @Override
                public void afterBulk(long executionId, BulkRequest request, Throwable failure) {
                    logger.error("Bulk is unsuccess : " + failure + ", executionId: " + executionId);
                }
            };
            BiConsumer<BulkRequest, ActionListener<BulkResponse>> bulkConsumer = (request, bulkListener) -> client
                .bulkAsync(request, RequestOptions.DEFAULT, bulkListener);
            BulkProcessor.Builder builder = BulkProcessor.builder(bulkConsumer, listener);
            builder.setBulkActions(5000);
            builder.setBulkSize(new ByteSizeValue(100L, ByteSizeUnit.MB));
            builder.setConcurrentRequests(10);
            builder.setFlushInterval(TimeValue.timeValueSeconds(100L));
            builder.setBackoffPolicy(BackoffPolicy.constantBackoff(TimeValue.timeValueSecon ds(1L), 3));
            // 注意点:让参数设置生效
            bulkProcessor = builder.build();
        } catch (Exception e) {
            e.printStackTrace();
            try {
                bulkProcessor.awaitClose(100L, TimeUnit.SECONDS);
            } catch (Exception e1) {
                logger.error(e1.getMessage());
            }
        } 
        return bulkProcessor;
    }
}

BulkProcessor 官网介绍

https://www.elastic.co/guide/en/elasticsearch/client/java-api/7.3/java-docs-bulk-processor.html

?

八:Controller

package com.lagou.es.controller;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.lagou.es.service.PositionService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.ResponseBody;
import java.io.IOException;
import java.util.List;
import java.util.Map;

@Controller
public class PositionController {

    @Autowired
    private PositionService service;

    //测试范文页面 
    @GetMapping({"/", "index"})
    public String indexPage() {
        return "index";
    }

    @GetMapping("/search/{keyword}/{pageNo}/{pageSize}")
    @ResponseBody
    public List<Map<String, Object>> searchPosition(@PathVariable("keyword") String keyword,
        @PathVariable("pageNo") int pageNo, @PathVariable("pageSize") int pageSize) throws IOException {
        List<Map<String, Object>> list = service.searchPos(keyword, pageNo, pageSize);
        System.out.println(list);
        return list;
    }

    @RequestMapping("/importAll")
    @ResponseBody
    public String importAll() {
        try {
            service.importAll();
        } catch (IOException e) {
            e.printStackTrace();
        }
        return "success";
    }
}

?十:启动类

package com.lagou.es;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
public class SearchApplication {

    public static void main(String[] args) {
        SpringApplication.run(SearchApplication.class, args);
    }
}

  大数据 最新文章
实现Kafka至少消费一次
亚马逊云科技:还在苦于ETL?Zero ETL的时代
初探MapReduce
【SpringBoot框架篇】32.基于注解+redis实现
Elasticsearch:如何减少 Elasticsearch 集
Go redis操作
Redis面试题
专题五 Redis高并发场景
基于GBase8s和Calcite的多数据源查询
Redis——底层数据结构原理
上一篇文章      下一篇文章      查看所有文章
加:2022-03-30 18:32:03  更:2022-03-30 18:33:05 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2025年1日历 -2025/1/16 15:56:54-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码