IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 大数据 -> 通过Elasticsearch+RabbitMQ+Canal实现对首页的课程的多条件过滤和数据同步 -> 正文阅读

[大数据]通过Elasticsearch+RabbitMQ+Canal实现对首页的课程的多条件过滤和数据同步

首先我们先看看Elastic的官网:https://www.elastic.co/cn/

Elastic有一条完整的产品线:Elasticsearch、Kibana、Logstash等,前三个简称为ELK技术栈。

Elasticsearch:

?

它有以下的特点:

①分布式,不需要人工去搭建集群

②Restful风格,比较容易上手

③搜索时,数据更新在Elasticsearch中几乎是完全同步的

?倒排索引:

?与传统的按照索引去查询数据不同,它是先查询键,再根据键去查询值。

Lucene:

它是一个库,用来创建倒排索引的,Elasticsearch是基于它的开源实时分布式搜索和分析引擎。

这个引擎安装起来比较复杂,就不细说了,接下来直接进入正题,在项目中具体的使用。

首先我们新建一个搜索服务:

因为是微服务,所以我的主项目的依赖是:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <packaging>pom</packaging>
    <modules>
        <module>common_api</module>
    </modules>
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.3.4.RELEASE</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>
    <groupId>com.blb</groupId>
    <artifactId>wisdom_education</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <name>wisdom_education</name>
    <description>Demo project for Spring Boot</description>
    <properties>
        <java.version>1.8</java.version>
        <spring.cloud-version>Hoxton.SR8</spring.cloud-version>
        <spring-cloud-alibaba.version>2.2.5.RELEASE</spring-cloud-alibaba.version>
    </properties>
    <dependencyManagement>
        <dependencies>
            <dependency>
                <groupId>org.springframework.cloud</groupId>
                <artifactId>spring-cloud-dependencies</artifactId>
                <version>${spring.cloud-version}</version>
                <type>pom</type>
                <scope>import</scope>
            </dependency>
            <dependency>
                <groupId>com.alibaba.cloud</groupId>
                <artifactId>spring-cloud-alibaba-dependencies</artifactId>
                <version>${spring-cloud-alibaba.version}</version>
                <type>pom</type>
                <scope>import</scope>
            </dependency>
        </dependencies>
    </dependencyManagement>
    <dependencies>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <optional>true</optional>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter</artifactId>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
                <configuration>
                    <excludes>
                        <exclude>
                            <groupId>org.projectlombok</groupId>
                            <artifactId>lombok</artifactId>
                        </exclude>
                    </excludes>
                </configuration>
            </plugin>
        </plugins>
    </build>

</project>

想要在搜索服务中添加这个引擎,得先导入它依赖:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <parent>
        <groupId>com.blb</groupId>
        <artifactId>wisdom_education</artifactId>
        <version>0.0.1-SNAPSHOT</version>
    </parent>
    <groupId>com.blb</groupId>
    <artifactId>wisdom_education_elasticsearch</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <name>wisdom_education_elasticsearch</name>
    <description>Demo project for Spring Boot</description>
    <properties>
        <java.version>1.8</java.version>
    </properties>
    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>com.blb</groupId>
            <artifactId>common_api</artifactId>
            <version>0.0.1-SNAPSHOT</version>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-data-elasticsearch</artifactId>
        </dependency>

        <dependency>
            <groupId>com.alibaba.cloud</groupId>
            <artifactId>spring-cloud-starter-alibaba-nacos-discovery</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-openfeign</artifactId>
        </dependency>
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.47</version>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-data-redis</artifactId>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>

</project>

这是我整个服务所需要的所有依赖,可以根据需求添加;

接着在配置文件中配置uri:

先把配置文件修改一下把后缀名改成yaml结尾。

spring:
  application:
    name: wisdom-education-elasticsearch
  cloud:
    nacos:
      server-addr: localhost:8848
  elasticsearch:
    rest:
      uris: 192.168.64.200:9200
  rabbitmq:
    host: 192.168.64.200
    port: 5672
    virtual-host: myhost
    username: guest
    password: guest
    listener:
      simple:
        acknowledge-mode: manual #消费者手动确认
  redis:
    host: 192.168.64.200
    port: 6379
server:
  port: 8801

这里的IP根据系统来,安装的是windows版本的话,IP就是本机IP,Linux系统安装的话,IP就是虚拟机的IP。

然后是我的Nacos上面的配置:

spring:
  application:
    name: wisdom-education-elasticsearch
  cloud:
    nacos:
      server-addr: localhost:8848
  elasticsearch:
    rest:
      uris: 192.168.64.200:9200
server:
  port: 8801

上面的两个配置文件里的配置包含了其它的配置比如Nacos注册配置中心,不需要可以删掉在本地配置也行(比如redis),推荐尽量配置Nacos中。RabbitMQ是必要的,后面将数据同步需要配置,所以提前配置也行。接着我们将需要进行过滤的数据同步到Elasticsearch服务中:

通过SpringCloud的组件feign调用课程服务的接口;

这人是课程数据服务的controller的接口

接着我们在搜索服务中调用它:

值得注意的是,调用的路径不能出错,以及@FeignClient("wisdom-education-course") 括号里的服务名称要和课程服务里的服务名一致,不然调用会失败。

?接着我们调用初始化键的方法,去创建索引。

首先写DAO层:

这是导的包,有些包导错的话,会出问题,不确定可以对着导:

import com.alibaba.fastjson.JSON;
import com.blb.common.entity.PageEntity;
import com.blb.wisdom_education_elasticsearch.entity.ElasticEntity;
import lombok.extern.slf4j.Slf4j;
import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.client.indices.CreateIndexRequest;
import org.elasticsearch.client.indices.CreateIndexResponse;
import org.elasticsearch.client.indices.GetIndexRequest;
import org.elasticsearch.common.text.Text;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.index.query.BoolQueryBuilder;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.index.reindex.BulkByScrollResponse;
import org.elasticsearch.index.reindex.DeleteByQueryRequest;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.search.fetch.subphase.highlight.HighlightBuilder;
import org.elasticsearch.search.fetch.subphase.highlight.HighlightField;
import org.elasticsearch.search.sort.SortOrder;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Repository;
import org.springframework.util.StringUtils;

import java.io.IOException;
import java.lang.reflect.Field;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;

其中在Entity中自定义了一个实体类:

@Data
@AllArgsConstructor
@NoArgsConstructor
public class ElasticEntity {

    private String id;
    private Object data;
}

然后就是DAO层里的接口了,其中包含了索引键的CRUD和课程的排序和高亮显示:

@Slf4j
@Repository
public class ElasticsearchDAO {

    @Autowired
    private RestHighLevelClient client;

    /**
     * 判断索引存在
     * @param indexName
     * @return
     * @throws IOException
     */
    public boolean existIndex(String indexName) throws IOException {
        //创建索引查询的请求
        GetIndexRequest request = new GetIndexRequest(indexName);
        //判断索引是否存在
        boolean exists = client.indices().exists(request, RequestOptions.DEFAULT);
        log.info("{}是否存在{}",indexName,exists);
        return exists;
    }

    /**
     * 删除索引
     * @param indexName
     * @throws IOException
     */
    public void deleteIndex(String indexName) throws IOException {
        //删除索引请求
        DeleteIndexRequest request = new DeleteIndexRequest(indexName);
        //发送删除请求
        AcknowledgedResponse delete = client.indices().delete(request, RequestOptions.DEFAULT);
        log.info("{}删除成功",indexName);
    }

    /**
     * 创建索引
     * @param indexName
     * @throws IOException
     */
    public void createIndex(String indexName) throws IOException {
        //创建索引请求
        CreateIndexRequest request = new CreateIndexRequest(indexName);
        //发送创建索引请求
        CreateIndexResponse response = client.indices().create(request, RequestOptions.DEFAULT);
        log.info("{}创建成功");
    }

    /**
     * 批量插入
     * @param indexName
     * @param list
     * @throws IOException
     */
    public void insertBatch(String indexName, List<ElasticEntity> list) throws IOException {
        //创建批量操作的请求
        BulkRequest request = new BulkRequest(indexName);
        //请求加入每个插入数据
        list.forEach(entity -> {
            //每个索引请求,设置id和数据
            request.add(new IndexRequest().id(entity.getId()).source(JSON.toJSONString(entity.getData()),
                    XContentType.JSON));
        });
        //执行批量操作
        client.bulk(request,RequestOptions.DEFAULT);
        log.info("批量插入完成");
    }
    /**
     * 按多个条件搜索内容
     * @param indexName 索引名
     * @param from 开始位置
     * @param size 分页长度
     * @param map 条件集合
     * @param sort 排序集合
     * @param clazz 搜索的类型
     * @param <T>
     * @return
     */
    public <T> PageEntity<T> searchPageByMap(String indexName,int from,int size,
                                             Map<String,String> map,Map<String,String> sort,
                                             Class<T> clazz,
                                             String... highlightFields) throws IOException {
        //创建搜索请求
        SearchRequest request = new SearchRequest(indexName);
        //请求生成器
        SearchSourceBuilder builder = new SearchSourceBuilder();
        //创建bool组合查询
        BoolQueryBuilder boolQuery = QueryBuilders.boolQuery();
        //如果搜索条件不为空,就设置搜索条件
        if(map != null && map.size() > 0){
            for(String key : map.keySet()){
                String value = map.get(key);
                if(!StringUtils.isEmpty(value)){
                    //设置过滤条件
                    boolQuery.filter(QueryBuilders.matchPhraseQuery(key,value));
                }
            }
        }
        if(boolQuery.filter().size() > 0){
            //如果设置了过滤条件,就按过滤搜索
            builder.query(boolQuery);
        }
        //设置分页参数
        builder.from(from);
        builder.size(size);
        //设置排序 field type
        if(sort != null && sort.size() > 0){
            builder.sort(sort.get("field"), SortOrder.fromString(sort.get("type")));
        }
        //创建高亮生成器
        HighlightBuilder highlightBuilder = new HighlightBuilder();
        //设置高亮字段
        for(String field : highlightFields){
            highlightBuilder.field(field);
        }
        //设置前后标签
        highlightBuilder.preTags("<span style='color:red'>").postTags("</span>");
        builder.highlighter(highlightBuilder);
        //执行搜索获得结果
        request.source(builder);
        SearchResponse response = client.search(request, RequestOptions.DEFAULT);
        SearchHit[] hits = response.getHits().getHits();
        List<T> data = new ArrayList<>();
        //将JSON格式的数据转换为Java对象
        for(SearchHit hit : hits){
            T res = JSON.parseObject(hit.getSourceAsString(),clazz);
            data.add(res);
            //获得所有高亮的字段
            Map<String, HighlightField> hFields = hit.getHighlightFields();
            for(String hField : hFields.keySet()){
                //获得反射字段
                try {
                    Field declaredField = clazz.getDeclaredField(hField);
                    //将字段的值替换为带高亮效果的属性值
                    declaredField.setAccessible(true);
                    Text[] fragments = hFields.get(hField).fragments();
                    if(declaredField != null && fragments.length > 0){
                        declaredField.set(res,fragments[0].string());
                    }
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }
        //返回分页数据
        return new PageEntity<>(from,response.getHits().getTotalHits().value,size,data);
    }

    /**
     * 添加或更新数据
     */
    public void saveOrUpdate(String indexName,ElasticEntity entity) throws IOException {
        IndexRequest request = new IndexRequest(indexName);
        request.id(entity.getId()).source(JSON.toJSONString(entity.getData()), XContentType.JSON);
        IndexResponse response = client.index(request, RequestOptions.DEFAULT);
        log.info("{}添加或更新数据成功 {}",indexName, JSON.toJSONString(response));
    }

    /**
     * 通过条件删除对象
     */
    public void deleteByQuery(String indexName, QueryBuilder builder) throws IOException {
        DeleteByQueryRequest request = new DeleteByQueryRequest(indexName);
        request.setQuery(builder);
        BulkByScrollResponse response = client.deleteByQuery(request, RequestOptions.DEFAULT);
        log.info("{}批查询删除数据成功 {}",indexName, JSON.toJSONString(response));
    }
}

在搜索服务的Service层写一个接口:

    /**
     * 初始化课程索引
     */
    void initCourseIndex();

接着去写它的实现类实现这个接口:

@Override
    public void initCourseIndex() {
        //查询课程索引是否存在
        try {
            boolean exist = elasticsearchDAO.existIndex(INDEX_NAME);
            if(exist){
                //删除原有索引
                elasticsearchDAO.deleteIndex(INDEX_NAME);
            }
            //创建索引
            elasticsearchDAO.createIndex(INDEX_NAME);
            //获得课程服务的所有课程信息
            List<Course> courses = courseFeignClient.getAllCourses();
            List<ElasticEntity> entities = new ArrayList<>();
            courses.forEach(course -> {
                entities.add(new ElasticEntity(String.valueOf(course.getId()),course));
                log.info("course:{}",course);
            });
            //批量添加到ES中
            elasticsearchDAO.insertBatch(INDEX_NAME,entities);
            log.info("courses:{},entities:{}",courses.size(),entities.size());
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

我们通过这个接口去对键做一个初始化操作,在Test测试中进行。

如下则表示初始化成功了,还有别忘了,Elasticsearch要保持运行,不然会报连接超时:

?接着我们去搜索服务通过Feign调用这个接口,获取数据:这也是上面提到的Feign调用

结果如下则说明数据导入成功了。

?接着有了数据我们就可以去写Service层和Controller层的接口了;

Service:

public interface ICourseService {
    /**
     * 初始化课程索引
     */
    void initCourseIndex();

    /**
     * 分页搜索课程
     * @param args
     * @return
     */
    PageEntity<Course> searchCoursePage(Map<String,String> args);

    /**
     * 添加或更新课程
     * @param course
     */
    void saveOrUpdate(Course course);

    /**
     * 按id删除课程
     * @param id
     */
    void removeById(String id);
}

实现类:


import com.alibaba.fastjson.JSON;
import com.blb.common.entity.Course;
import com.blb.common.entity.PageEntity;
import com.blb.wisdom_education_elasticsearch.dao.ElasticsearchDAO;
import com.blb.wisdom_education_elasticsearch.entity.ElasticEntity;
import com.blb.wisdom_education_elasticsearch.feign.CourseFeign;
import com.blb.wisdom_education_elasticsearch.service.ICourseService;
import lombok.extern.slf4j.Slf4j;
import org.elasticsearch.index.query.QueryBuilders;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;

@Slf4j
@Service
public class CourseServiceImpl implements ICourseService {

    public static final String INDEX_NAME = "course";

    @Autowired
    private CourseFeign courseFeignClient;

    @Autowired
    private ElasticsearchDAO elasticsearchDAO;

    @Override
    public void initCourseIndex() {
        //查询课程索引是否存在
        try {
            boolean exist = elasticsearchDAO.existIndex(INDEX_NAME);
            if(exist){
                //删除原有索引
                elasticsearchDAO.deleteIndex(INDEX_NAME);
            }
            //创建索引
            elasticsearchDAO.createIndex(INDEX_NAME);
            //获得课程服务的所有课程信息
            List<Course> courses = courseFeignClient.getAllCourses();
            List<ElasticEntity> entities = new ArrayList<>();
            courses.forEach(course -> {
                entities.add(new ElasticEntity(String.valueOf(course.getId()),course));
                log.info("course:{}",course);
            });
            //批量添加到ES中
            elasticsearchDAO.insertBatch(INDEX_NAME,entities);
            log.info("courses:{},entities:{}",courses.size(),entities.size());
        } catch (IOException e) {
            e.printStackTrace();
        }
    }


    @Override
    public PageEntity<Course> searchCoursePage(Map<String, String> args) {
        int current = 1;
        int size = 1;
        Map<String,String> search = null;
        Map<String,String> sort = null;
        if(args.containsKey("current")){
            current = Integer.valueOf(args.get("current"));
        }
        if(args.containsKey("size")){
            size = Integer.valueOf(args.get("size"));
        }
        if(args.containsKey("search")){
            search = JSON.parseObject(args.get("search"),Map.class);
        }
        if(args.containsKey("sort")){
            sort = JSON.parseObject(args.get("sort"),Map.class);
        }
        try {
            return elasticsearchDAO.searchPageByMap("course",(current - 1) * size,size,search,sort,Course.class,
                    "courseName","brief");
        } catch (IOException e) {
            log.error("出现异常",e);
            throw new RuntimeException(e);
        }
    }

    /**
     * 添加或更新课程
     * @param course
     */
    public void saveOrUpdate(Course course){
        try {
            elasticsearchDAO.saveOrUpdate(INDEX_NAME,
                    new ElasticEntity(String.valueOf(course.getId()),course));
        } catch (IOException e) {
            log.error("出现异常",e);
            throw new RuntimeException(e);
        }
    }

    /**
     * 按id删除课程
     * @param id
     */
    public void removeById(String id){
        try {
            elasticsearchDAO.deleteByQuery(INDEX_NAME, QueryBuilders.termQuery("id",id));
        } catch (IOException e) {
            log.error("出现异常",e);
            throw new RuntimeException(e);
        }
    }
}

然后是Controller层:



import com.blb.common.entity.Course;
import com.blb.common.entity.PageEntity;
import com.blb.wisdom_education_elasticsearch.service.ICourseService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import java.util.Map;

@RestController
@Slf4j
public class CourseController {
    @Autowired
    private ICourseService service;

    @RequestMapping("/search-courses")
    public ResponseEntity<PageEntity<Course>> searchCoursePage(@RequestBody Map<String, String> map) {
        PageEntity<Course> page = service.searchCoursePage(map);
        log.info("map:{}" + map);
        log.info("page:{}" + page);
        return ResponseEntity.ok(page);
    }
}

到此,我们的搜索服务久差不多完成了。不考虑数据同步这一块的话,已经具备了按多条件的查询、排序,以及输入文字的高亮显示:

?那么问题来了,我们这个是一步一步的将数据挪过来的,当数据库的数据发生了变化,我们怎么去对这个数据进行一个同步的跟新呢?

接下来就要介绍的工具:通过RabbitMQ消息队列+Canal同步工具实现对数据库的数据的一个实时监控,这个工具的安装久不细说了,有很多教程。我们直接往下去实现好了。

首先我们需要新加一个数据同步服务:继承父项目,下面是依赖:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <parent>
        <groupId>com.blb</groupId>
        <artifactId>wisdom_education</artifactId>
        <version>0.0.1-SNAPSHOT</version>
    </parent>
    <groupId>com.blb</groupId>
    <artifactId>wisdom_education_datasync</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <name>wisdom_education_datasync</name>
    <description>Demo project for Spring Boot</description>
    <properties>
        <java.version>1.8</java.version>
    </properties>
    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.47</version>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>com.xpand</groupId>
            <artifactId>starter-canal</artifactId>
            <version>0.0.1-SNAPSHOT</version>
        </dependency>
        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <version>8.0.29</version>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>
        <dependency>
            <groupId>com.alibaba.cloud</groupId>
            <artifactId>spring-cloud-starter-alibaba-nacos-discovery</artifactId>
        </dependency>
        <dependency>
            <groupId>com.alibaba.cloud</groupId>
            <artifactId>spring-cloud-starter-alibaba-nacos-config</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-actuator</artifactId>
        </dependency>
        <dependency>
            <groupId>com.baomidou</groupId>
            <artifactId>mybatis-plus-boot-starter</artifactId>
            <version>3.0.1</version>
        </dependency>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-openfeign</artifactId>
        </dependency>
        <dependency>
            <groupId>com.blb</groupId>
            <artifactId>common_api</artifactId>
            <version>0.0.1-SNAPSHOT</version>
        </dependency>
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.47</version>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>

</project>

然后就是RabbitMQ的一些配置了。

在登录上RabbitMQ后,新建一个admin 账户:

创建两个交换机:

?名字尽量取好点,点添加就好:

?好需要创建两个队列:

还有Canal的安装直接用window是的比较方便,要是启动报错记得去,可以去一个配置文件中处理一行配置,具体是什么忘了,希望别碰到最好,哈哈。

我们得先看看在数据库中的binlog配置

SHOW VARIABLES LIKE '%log_bin%'

ON表示开启了,OFF的话需要去设置它开启:

??windows配置文件是MySQL安装目录的my.ini

???linux在/etc/my.cnf?

???修改:

   [mysqld]
   log-bin=mysql-bin
   binlog-format=ROW
   server_id=1

进入mysql,创建canal用户并授权:

  create user canal@'%'IDENTIFIED WITH mysql_native_password BY 'canal';
   GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT,SUPER ON *.* TO 'canal'@'%';
   FLUSH PRIVILEGES;

还需要添加配置:

进入canal目录,修改配置文件:

我们先在数据库中输入命令:

show master status

接着去修改canal配置:?

vim conf/example/instance.peoperties

windows的话直接去目录找:

然后启动canal:

它主要的问题有:

? ? ? 1.?异常信息?authentication?error,数据库账号和密码配置错误
??????2.?异常信息?can't?find?position,检查配置的文件名和位置,再删除conf/example/meta.dat?重启
??????3.?客户端版本兼容问题,canal的版本和客户端的版本要一致
?

然后去服务中配置它们:这是我的数据同步服务的配置文件的配置:

canal.client.instances.example.host=127.0.0.1
canal.client.instances.example.port=11111
canal.client.instances.example.batchSize=1000

spring.application.name=wisdom-education-datasync
# 注册nacos
spring.cloud.nacos.discovery.server-addr=127.0.0.1:8848
# nacos配置中心地址
spring.cloud.nacos.config.server-addr=127.0.0.1:8848
# 配置文件的前缀
spring.cloud.nacos.config.prefix=wisdom-education-datasync
# 后缀
spring.cloud.nacos.config.file-extension=properties
# profile
spring.profiles.active=dev
#开启全部端点
management.endpoints.web.exposure.include=*

spring.rabbitmq.host=192.168.64.200
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
spring.rabbitmq.virtual-host=myhost

?Nacos部分:

server.port=8812

spring.application.name=wisdom-education-datasync

spring.datasource.driver-class-name=com.mysql.cj.jdbc.Driver
spring.datasource.url=jdbc:mysql://localhost:3306/wisdom_education_advertisement?serverTimeZone=UTC&useUnicode=true&characterEncoding=UTF-8
spring.datasource.username=root
spring.datasource.password=root

mybatis-plus.mapper-locations=classpath:mapper/*.xml
mybatis-plus.configuration.map-underscore-to-camel-case=true
mybatis-plus.type-aliases-package=com.blb.common_api.entity

spring.redis.host=192.168.64.200
spring.redis.port=6379
spring.redis.database=0
spring.redis.jedis.pool.max-active=100
spring.redis.jedis.pool.max-wait=100ms
spring.redis.jedis.pool.max-idle=100
spring.redis.jedis.pool.min-idle=10

我们得写一个接口,通过广告课程数据的id去查询所有课程的接口:

这里就不细说了,很简单的一个接口:同样的用feign去调用它

接下来是RabbitMQ的配置类;

package com.blb.wisdom_education_datasync.config;

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class RabbitMQConfig {

    public static final String QUEUE_SAVE = "edu.course.save.queue";
    public static final String QUEUE_REMOVE = "edu.course.remove.queue";
    public static final String EXCHANGE_COURSE = "edu.course.exchange";
    public static final String KEY_SAVE = "edu.course.save.key";
    public static final String KEY_REMOVE = "edu.course.remove.key";

    //生成队列
    @Bean
    public Queue queueSave(){
        return new Queue(QUEUE_SAVE);
    }

    @Bean
    public Queue queueRemove(){
        return new Queue(QUEUE_REMOVE);
    }

    //生成主题模式交换机
    @Bean
    public TopicExchange exchange(){
        return new TopicExchange(EXCHANGE_COURSE);
    }

    //生成绑定
    @Bean
    public Binding bindingSave(){
        return BindingBuilder.bind(queueSave()).to(exchange()).with(KEY_SAVE);
    }

    @Bean
    public Binding bindingRemove(){
        return BindingBuilder.bind(queueRemove()).to(exchange()).with(KEY_REMOVE);
    }
}

?我们要实现数据的同步的话,整个流程大致是这个样的;

?所以我们在搜索服务中也需要加一个listener监听器:

package com.blb.wisdom_education_elasticsearch.listener;

import com.alibaba.fastjson.JSON;
import com.blb.common.entity.Course;
import com.blb.wisdom_education_elasticsearch.service.ICourseService;
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

/**
 * 课程消息监听器
 */
@Slf4j
@Component
public class CourseMsgListener {

    public static final String QUEUE_SAVE = "edu.course.save.queue";
    public static final String QUEUE_REMOVE = "edu.course.remove.queue";
    public static final String EXCHANGE_COURSE = "edu.course.exchange";
    public static final String KEY_SAVE = "edu.course.save.key";
    public static final String KEY_REMOVE = "edu.course.remove.key";

    @Autowired
    private ICourseService courseService;

    //监听添加或更新操作的队列
    @RabbitListener(bindings = @QueueBinding(value = @Queue(value = QUEUE_SAVE, durable = "true"),
            exchange = @Exchange(value = EXCHANGE_COURSE, ignoreDeclarationExceptions = "true"),
            key = KEY_SAVE))
    public void receiveSaveMsg(String json, Message message, Channel channel) {
        //收到同步服务发送的课程对象
        Course course = JSON.parseObject(json, Course.class);
        courseService.saveOrUpdate(course);
        log.info("接收课程数据:{}", course);
    }

    //监听删除操作的队列
    @RabbitListener(bindings = @QueueBinding(value = @Queue(value = QUEUE_REMOVE, durable = "true"),
            exchange = @Exchange(value = EXCHANGE_COURSE, ignoreDeclarationExceptions = "true"),
            key = KEY_REMOVE))
    public void receiveRemoveMsg(String id, Message message, Channel channel) {
        courseService.removeById(id);
        log.info("接收删除id:{}", id);
    }
}

然后是数据同步服务的监听器:

package com.blb.wisdom_education_datasync.listener;

import com.alibaba.fastjson.JSON;
import com.alibaba.otter.canal.protocol.CanalEntry;
import com.blb.common.entity.Course;
import com.blb.wisdom_education_datasync.config.RabbitMQConfig;
import com.blb.wisdom_education_datasync.feign.CourseFeignClient;
import com.xpand.starter.canal.annotation.CanalEventListener;
import com.xpand.starter.canal.annotation.ListenPoint;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;

import java.util.List;

/**
 * 监听课程表的监听器
 */
@CanalEventListener
public class CourseCanalListener {

    @Autowired
    private CourseFeignClient courseFeignClient;

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @ListenPoint(schema = "wisdom_education_course",table = "course")
    public void courseChange(CanalEntry.EventType eventType, CanalEntry.RowData rowData){
        System.out.println("1111");
        //获得操作的类型
        String type = eventType.toString();
        if("INSERT".equals(type) || "UPDATE".equals(type)){
            List<CanalEntry.Column> list = rowData.getAfterColumnsList();
            //获得课程id
            String id = null;
            for(CanalEntry.Column column : list){
                if("id".equals(column.getName())){
                    id = column.getValue();
                    break;
                }
            }
            //调用课程服务查询课程信息
            Course course = courseFeignClient.getCourseById(Long.valueOf(id));
            //发送消息给搜索服务
            System.out.println(type+":"+course);
            rabbitTemplate.convertAndSend(RabbitMQConfig.EXCHANGE_COURSE,RabbitMQConfig.KEY_SAVE, JSON.toJSONString(course));
        }else if("DELETE".equals(type)){
            List<CanalEntry.Column> list = rowData.getBeforeColumnsList();
            //获得课程id
            String id = null;
            for(CanalEntry.Column column : list){
                if("id".equals(column.getName())){
                    id = column.getValue();
                    break;
                }
            }
            //发送消息给搜索服务
            rabbitTemplate.convertAndSend(RabbitMQConfig.EXCHANGE_COURSE,RabbitMQConfig.KEY_REMOVE, id);
        }
   }
}

至此加上前端的代码就可以完成一个Elasticsearch+RabbitMQ+Canal的一个数据过滤和数据同步的效果了。运行有bug的话,希望自己加油d,RabbiotMQ消息保证问题久不说了添加一些小配置即可。

完结,谢谢。

都看到这儿了不给个赞或者订阅嘛??

  大数据 最新文章
实现Kafka至少消费一次
亚马逊云科技:还在苦于ETL?Zero ETL的时代
初探MapReduce
【SpringBoot框架篇】32.基于注解+redis实现
Elasticsearch:如何减少 Elasticsearch 集
Go redis操作
Redis面试题
专题五 Redis高并发场景
基于GBase8s和Calcite的多数据源查询
Redis——底层数据结构原理
上一篇文章      下一篇文章      查看所有文章
加:2022-09-04 01:18:22  更:2022-09-04 01:21:36 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2025年1日历 -2025/1/24 3:20:18-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码