首先我们先看看Elastic的官网:https://www.elastic.co/cn/
Elastic有一条完整的产品线:Elasticsearch、Kibana、Logstash等,前三个简称为ELK技术栈。
Elasticsearch:
?
它有以下的特点:
①分布式,不需要人工去搭建集群
②Restful风格,比较容易上手
③搜索时,数据更新在Elasticsearch中几乎是完全同步的
?倒排索引:
?与传统的按照索引去查询数据不同,它是先查询键,再根据键去查询值。
Lucene:
它是一个库,用来创建倒排索引的,Elasticsearch是基于它的开源实时分布式搜索和分析引擎。
这个引擎安装起来比较复杂,就不细说了,接下来直接进入正题,在项目中具体的使用。
首先我们新建一个搜索服务:
因为是微服务,所以我的主项目的依赖是:
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<packaging>pom</packaging>
<modules>
<module>common_api</module>
</modules>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.3.4.RELEASE</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<groupId>com.blb</groupId>
<artifactId>wisdom_education</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>wisdom_education</name>
<description>Demo project for Spring Boot</description>
<properties>
<java.version>1.8</java.version>
<spring.cloud-version>Hoxton.SR8</spring.cloud-version>
<spring-cloud-alibaba.version>2.2.5.RELEASE</spring-cloud-alibaba.version>
</properties>
<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-dependencies</artifactId>
<version>${spring.cloud-version}</version>
<type>pom</type>
<scope>import</scope>
</dependency>
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-alibaba-dependencies</artifactId>
<version>${spring-cloud-alibaba.version}</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>
<dependencies>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
<configuration>
<excludes>
<exclude>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</exclude>
</excludes>
</configuration>
</plugin>
</plugins>
</build>
</project>
想要在搜索服务中添加这个引擎,得先导入它依赖:
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>com.blb</groupId>
<artifactId>wisdom_education</artifactId>
<version>0.0.1-SNAPSHOT</version>
</parent>
<groupId>com.blb</groupId>
<artifactId>wisdom_education_elasticsearch</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>wisdom_education_elasticsearch</name>
<description>Demo project for Spring Boot</description>
<properties>
<java.version>1.8</java.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>com.blb</groupId>
<artifactId>common_api</artifactId>
<version>0.0.1-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-elasticsearch</artifactId>
</dependency>
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-alibaba-nacos-discovery</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-openfeign</artifactId>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.47</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
这是我整个服务所需要的所有依赖,可以根据需求添加;
接着在配置文件中配置uri:
先把配置文件修改一下把后缀名改成yaml结尾。
spring:
application:
name: wisdom-education-elasticsearch
cloud:
nacos:
server-addr: localhost:8848
elasticsearch:
rest:
uris: 192.168.64.200:9200
rabbitmq:
host: 192.168.64.200
port: 5672
virtual-host: myhost
username: guest
password: guest
listener:
simple:
acknowledge-mode: manual #消费者手动确认
redis:
host: 192.168.64.200
port: 6379
server:
port: 8801
这里的IP根据系统来,安装的是windows版本的话,IP就是本机IP,Linux系统安装的话,IP就是虚拟机的IP。
然后是我的Nacos上面的配置:
spring:
application:
name: wisdom-education-elasticsearch
cloud:
nacos:
server-addr: localhost:8848
elasticsearch:
rest:
uris: 192.168.64.200:9200
server:
port: 8801
上面的两个配置文件里的配置包含了其它的配置比如Nacos注册配置中心,不需要可以删掉在本地配置也行(比如redis),推荐尽量配置Nacos中。RabbitMQ是必要的,后面将数据同步需要配置,所以提前配置也行。接着我们将需要进行过滤的数据同步到Elasticsearch服务中:
通过SpringCloud的组件feign调用课程服务的接口;
这人是课程数据服务的controller的接口
接着我们在搜索服务中调用它:
值得注意的是,调用的路径不能出错,以及@FeignClient("wisdom-education-course") 括号里的服务名称要和课程服务里的服务名一致,不然调用会失败。
?接着我们调用初始化键的方法,去创建索引。
首先写DAO层:
这是导的包,有些包导错的话,会出问题,不确定可以对着导:
import com.alibaba.fastjson.JSON;
import com.blb.common.entity.PageEntity;
import com.blb.wisdom_education_elasticsearch.entity.ElasticEntity;
import lombok.extern.slf4j.Slf4j;
import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.client.indices.CreateIndexRequest;
import org.elasticsearch.client.indices.CreateIndexResponse;
import org.elasticsearch.client.indices.GetIndexRequest;
import org.elasticsearch.common.text.Text;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.index.query.BoolQueryBuilder;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.index.reindex.BulkByScrollResponse;
import org.elasticsearch.index.reindex.DeleteByQueryRequest;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.search.fetch.subphase.highlight.HighlightBuilder;
import org.elasticsearch.search.fetch.subphase.highlight.HighlightField;
import org.elasticsearch.search.sort.SortOrder;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Repository;
import org.springframework.util.StringUtils;
import java.io.IOException;
import java.lang.reflect.Field;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
其中在Entity中自定义了一个实体类:
@Data
@AllArgsConstructor
@NoArgsConstructor
public class ElasticEntity {
private String id;
private Object data;
}
然后就是DAO层里的接口了,其中包含了索引键的CRUD和课程的排序和高亮显示:
@Slf4j
@Repository
public class ElasticsearchDAO {
@Autowired
private RestHighLevelClient client;
/**
* 判断索引存在
* @param indexName
* @return
* @throws IOException
*/
public boolean existIndex(String indexName) throws IOException {
//创建索引查询的请求
GetIndexRequest request = new GetIndexRequest(indexName);
//判断索引是否存在
boolean exists = client.indices().exists(request, RequestOptions.DEFAULT);
log.info("{}是否存在{}",indexName,exists);
return exists;
}
/**
* 删除索引
* @param indexName
* @throws IOException
*/
public void deleteIndex(String indexName) throws IOException {
//删除索引请求
DeleteIndexRequest request = new DeleteIndexRequest(indexName);
//发送删除请求
AcknowledgedResponse delete = client.indices().delete(request, RequestOptions.DEFAULT);
log.info("{}删除成功",indexName);
}
/**
* 创建索引
* @param indexName
* @throws IOException
*/
public void createIndex(String indexName) throws IOException {
//创建索引请求
CreateIndexRequest request = new CreateIndexRequest(indexName);
//发送创建索引请求
CreateIndexResponse response = client.indices().create(request, RequestOptions.DEFAULT);
log.info("{}创建成功");
}
/**
* 批量插入
* @param indexName
* @param list
* @throws IOException
*/
public void insertBatch(String indexName, List<ElasticEntity> list) throws IOException {
//创建批量操作的请求
BulkRequest request = new BulkRequest(indexName);
//请求加入每个插入数据
list.forEach(entity -> {
//每个索引请求,设置id和数据
request.add(new IndexRequest().id(entity.getId()).source(JSON.toJSONString(entity.getData()),
XContentType.JSON));
});
//执行批量操作
client.bulk(request,RequestOptions.DEFAULT);
log.info("批量插入完成");
}
/**
* 按多个条件搜索内容
* @param indexName 索引名
* @param from 开始位置
* @param size 分页长度
* @param map 条件集合
* @param sort 排序集合
* @param clazz 搜索的类型
* @param <T>
* @return
*/
public <T> PageEntity<T> searchPageByMap(String indexName,int from,int size,
Map<String,String> map,Map<String,String> sort,
Class<T> clazz,
String... highlightFields) throws IOException {
//创建搜索请求
SearchRequest request = new SearchRequest(indexName);
//请求生成器
SearchSourceBuilder builder = new SearchSourceBuilder();
//创建bool组合查询
BoolQueryBuilder boolQuery = QueryBuilders.boolQuery();
//如果搜索条件不为空,就设置搜索条件
if(map != null && map.size() > 0){
for(String key : map.keySet()){
String value = map.get(key);
if(!StringUtils.isEmpty(value)){
//设置过滤条件
boolQuery.filter(QueryBuilders.matchPhraseQuery(key,value));
}
}
}
if(boolQuery.filter().size() > 0){
//如果设置了过滤条件,就按过滤搜索
builder.query(boolQuery);
}
//设置分页参数
builder.from(from);
builder.size(size);
//设置排序 field type
if(sort != null && sort.size() > 0){
builder.sort(sort.get("field"), SortOrder.fromString(sort.get("type")));
}
//创建高亮生成器
HighlightBuilder highlightBuilder = new HighlightBuilder();
//设置高亮字段
for(String field : highlightFields){
highlightBuilder.field(field);
}
//设置前后标签
highlightBuilder.preTags("<span style='color:red'>").postTags("</span>");
builder.highlighter(highlightBuilder);
//执行搜索获得结果
request.source(builder);
SearchResponse response = client.search(request, RequestOptions.DEFAULT);
SearchHit[] hits = response.getHits().getHits();
List<T> data = new ArrayList<>();
//将JSON格式的数据转换为Java对象
for(SearchHit hit : hits){
T res = JSON.parseObject(hit.getSourceAsString(),clazz);
data.add(res);
//获得所有高亮的字段
Map<String, HighlightField> hFields = hit.getHighlightFields();
for(String hField : hFields.keySet()){
//获得反射字段
try {
Field declaredField = clazz.getDeclaredField(hField);
//将字段的值替换为带高亮效果的属性值
declaredField.setAccessible(true);
Text[] fragments = hFields.get(hField).fragments();
if(declaredField != null && fragments.length > 0){
declaredField.set(res,fragments[0].string());
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
//返回分页数据
return new PageEntity<>(from,response.getHits().getTotalHits().value,size,data);
}
/**
* 添加或更新数据
*/
public void saveOrUpdate(String indexName,ElasticEntity entity) throws IOException {
IndexRequest request = new IndexRequest(indexName);
request.id(entity.getId()).source(JSON.toJSONString(entity.getData()), XContentType.JSON);
IndexResponse response = client.index(request, RequestOptions.DEFAULT);
log.info("{}添加或更新数据成功 {}",indexName, JSON.toJSONString(response));
}
/**
* 通过条件删除对象
*/
public void deleteByQuery(String indexName, QueryBuilder builder) throws IOException {
DeleteByQueryRequest request = new DeleteByQueryRequest(indexName);
request.setQuery(builder);
BulkByScrollResponse response = client.deleteByQuery(request, RequestOptions.DEFAULT);
log.info("{}批查询删除数据成功 {}",indexName, JSON.toJSONString(response));
}
}
在搜索服务的Service层写一个接口:
/**
* 初始化课程索引
*/
void initCourseIndex();
接着去写它的实现类实现这个接口:
@Override
public void initCourseIndex() {
//查询课程索引是否存在
try {
boolean exist = elasticsearchDAO.existIndex(INDEX_NAME);
if(exist){
//删除原有索引
elasticsearchDAO.deleteIndex(INDEX_NAME);
}
//创建索引
elasticsearchDAO.createIndex(INDEX_NAME);
//获得课程服务的所有课程信息
List<Course> courses = courseFeignClient.getAllCourses();
List<ElasticEntity> entities = new ArrayList<>();
courses.forEach(course -> {
entities.add(new ElasticEntity(String.valueOf(course.getId()),course));
log.info("course:{}",course);
});
//批量添加到ES中
elasticsearchDAO.insertBatch(INDEX_NAME,entities);
log.info("courses:{},entities:{}",courses.size(),entities.size());
} catch (IOException e) {
e.printStackTrace();
}
}
我们通过这个接口去对键做一个初始化操作,在Test测试中进行。
如下则表示初始化成功了,还有别忘了,Elasticsearch要保持运行,不然会报连接超时:
?接着我们去搜索服务通过Feign调用这个接口,获取数据:这也是上面提到的Feign调用
结果如下则说明数据导入成功了。
?接着有了数据我们就可以去写Service层和Controller层的接口了;
Service:
public interface ICourseService {
/**
* 初始化课程索引
*/
void initCourseIndex();
/**
* 分页搜索课程
* @param args
* @return
*/
PageEntity<Course> searchCoursePage(Map<String,String> args);
/**
* 添加或更新课程
* @param course
*/
void saveOrUpdate(Course course);
/**
* 按id删除课程
* @param id
*/
void removeById(String id);
}
实现类:
import com.alibaba.fastjson.JSON;
import com.blb.common.entity.Course;
import com.blb.common.entity.PageEntity;
import com.blb.wisdom_education_elasticsearch.dao.ElasticsearchDAO;
import com.blb.wisdom_education_elasticsearch.entity.ElasticEntity;
import com.blb.wisdom_education_elasticsearch.feign.CourseFeign;
import com.blb.wisdom_education_elasticsearch.service.ICourseService;
import lombok.extern.slf4j.Slf4j;
import org.elasticsearch.index.query.QueryBuilders;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
@Slf4j
@Service
public class CourseServiceImpl implements ICourseService {
public static final String INDEX_NAME = "course";
@Autowired
private CourseFeign courseFeignClient;
@Autowired
private ElasticsearchDAO elasticsearchDAO;
@Override
public void initCourseIndex() {
//查询课程索引是否存在
try {
boolean exist = elasticsearchDAO.existIndex(INDEX_NAME);
if(exist){
//删除原有索引
elasticsearchDAO.deleteIndex(INDEX_NAME);
}
//创建索引
elasticsearchDAO.createIndex(INDEX_NAME);
//获得课程服务的所有课程信息
List<Course> courses = courseFeignClient.getAllCourses();
List<ElasticEntity> entities = new ArrayList<>();
courses.forEach(course -> {
entities.add(new ElasticEntity(String.valueOf(course.getId()),course));
log.info("course:{}",course);
});
//批量添加到ES中
elasticsearchDAO.insertBatch(INDEX_NAME,entities);
log.info("courses:{},entities:{}",courses.size(),entities.size());
} catch (IOException e) {
e.printStackTrace();
}
}
@Override
public PageEntity<Course> searchCoursePage(Map<String, String> args) {
int current = 1;
int size = 1;
Map<String,String> search = null;
Map<String,String> sort = null;
if(args.containsKey("current")){
current = Integer.valueOf(args.get("current"));
}
if(args.containsKey("size")){
size = Integer.valueOf(args.get("size"));
}
if(args.containsKey("search")){
search = JSON.parseObject(args.get("search"),Map.class);
}
if(args.containsKey("sort")){
sort = JSON.parseObject(args.get("sort"),Map.class);
}
try {
return elasticsearchDAO.searchPageByMap("course",(current - 1) * size,size,search,sort,Course.class,
"courseName","brief");
} catch (IOException e) {
log.error("出现异常",e);
throw new RuntimeException(e);
}
}
/**
* 添加或更新课程
* @param course
*/
public void saveOrUpdate(Course course){
try {
elasticsearchDAO.saveOrUpdate(INDEX_NAME,
new ElasticEntity(String.valueOf(course.getId()),course));
} catch (IOException e) {
log.error("出现异常",e);
throw new RuntimeException(e);
}
}
/**
* 按id删除课程
* @param id
*/
public void removeById(String id){
try {
elasticsearchDAO.deleteByQuery(INDEX_NAME, QueryBuilders.termQuery("id",id));
} catch (IOException e) {
log.error("出现异常",e);
throw new RuntimeException(e);
}
}
}
然后是Controller层:
import com.blb.common.entity.Course;
import com.blb.common.entity.PageEntity;
import com.blb.wisdom_education_elasticsearch.service.ICourseService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import java.util.Map;
@RestController
@Slf4j
public class CourseController {
@Autowired
private ICourseService service;
@RequestMapping("/search-courses")
public ResponseEntity<PageEntity<Course>> searchCoursePage(@RequestBody Map<String, String> map) {
PageEntity<Course> page = service.searchCoursePage(map);
log.info("map:{}" + map);
log.info("page:{}" + page);
return ResponseEntity.ok(page);
}
}
到此,我们的搜索服务久差不多完成了。不考虑数据同步这一块的话,已经具备了按多条件的查询、排序,以及输入文字的高亮显示:
?那么问题来了,我们这个是一步一步的将数据挪过来的,当数据库的数据发生了变化,我们怎么去对这个数据进行一个同步的跟新呢?
接下来就要介绍的工具:通过RabbitMQ消息队列+Canal同步工具实现对数据库的数据的一个实时监控,这个工具的安装久不细说了,有很多教程。我们直接往下去实现好了。
首先我们需要新加一个数据同步服务:继承父项目,下面是依赖:
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>com.blb</groupId>
<artifactId>wisdom_education</artifactId>
<version>0.0.1-SNAPSHOT</version>
</parent>
<groupId>com.blb</groupId>
<artifactId>wisdom_education_datasync</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>wisdom_education_datasync</name>
<description>Demo project for Spring Boot</description>
<properties>
<java.version>1.8</java.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.47</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.xpand</groupId>
<artifactId>starter-canal</artifactId>
<version>0.0.1-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>8.0.29</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-alibaba-nacos-discovery</artifactId>
</dependency>
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-alibaba-nacos-config</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
<dependency>
<groupId>com.baomidou</groupId>
<artifactId>mybatis-plus-boot-starter</artifactId>
<version>3.0.1</version>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-openfeign</artifactId>
</dependency>
<dependency>
<groupId>com.blb</groupId>
<artifactId>common_api</artifactId>
<version>0.0.1-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.47</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
然后就是RabbitMQ的一些配置了。
在登录上RabbitMQ后,新建一个admin 账户:
创建两个交换机:
?名字尽量取好点,点添加就好:
?好需要创建两个队列:
还有Canal的安装直接用window是的比较方便,要是启动报错记得去,可以去一个配置文件中处理一行配置,具体是什么忘了,希望别碰到最好,哈哈。
我们得先看看在数据库中的binlog配置
SHOW VARIABLES LIKE '%log_bin%'
ON表示开启了,OFF的话需要去设置它开启:
??windows配置文件是MySQL安装目录的my.ini
???linux在/etc/my.cnf?
???修改:
[mysqld]
log-bin=mysql-bin
binlog-format=ROW
server_id=1
进入mysql,创建canal用户并授权:
create user canal@'%'IDENTIFIED WITH mysql_native_password BY 'canal';
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT,SUPER ON *.* TO 'canal'@'%';
FLUSH PRIVILEGES;
还需要添加配置:
进入canal目录,修改配置文件:
我们先在数据库中输入命令:
show master status
接着去修改canal配置:?
vim conf/example/instance.peoperties
windows的话直接去目录找:
然后启动canal:
它主要的问题有:
? ? ? 1.?异常信息?authentication?error,数据库账号和密码配置错误 ??????2.?异常信息?can't?find?position,检查配置的文件名和位置,再删除conf/example/meta.dat?重启 ??????3.?客户端版本兼容问题,canal的版本和客户端的版本要一致 ?
然后去服务中配置它们:这是我的数据同步服务的配置文件的配置:
canal.client.instances.example.host=127.0.0.1
canal.client.instances.example.port=11111
canal.client.instances.example.batchSize=1000
spring.application.name=wisdom-education-datasync
# 注册nacos
spring.cloud.nacos.discovery.server-addr=127.0.0.1:8848
# nacos配置中心地址
spring.cloud.nacos.config.server-addr=127.0.0.1:8848
# 配置文件的前缀
spring.cloud.nacos.config.prefix=wisdom-education-datasync
# 后缀
spring.cloud.nacos.config.file-extension=properties
# profile
spring.profiles.active=dev
#开启全部端点
management.endpoints.web.exposure.include=*
spring.rabbitmq.host=192.168.64.200
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
spring.rabbitmq.virtual-host=myhost
?Nacos部分:
server.port=8812
spring.application.name=wisdom-education-datasync
spring.datasource.driver-class-name=com.mysql.cj.jdbc.Driver
spring.datasource.url=jdbc:mysql://localhost:3306/wisdom_education_advertisement?serverTimeZone=UTC&useUnicode=true&characterEncoding=UTF-8
spring.datasource.username=root
spring.datasource.password=root
mybatis-plus.mapper-locations=classpath:mapper/*.xml
mybatis-plus.configuration.map-underscore-to-camel-case=true
mybatis-plus.type-aliases-package=com.blb.common_api.entity
spring.redis.host=192.168.64.200
spring.redis.port=6379
spring.redis.database=0
spring.redis.jedis.pool.max-active=100
spring.redis.jedis.pool.max-wait=100ms
spring.redis.jedis.pool.max-idle=100
spring.redis.jedis.pool.min-idle=10
我们得写一个接口,通过广告课程数据的id去查询所有课程的接口:
这里就不细说了,很简单的一个接口:同样的用feign去调用它
接下来是RabbitMQ的配置类;
package com.blb.wisdom_education_datasync.config;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class RabbitMQConfig {
public static final String QUEUE_SAVE = "edu.course.save.queue";
public static final String QUEUE_REMOVE = "edu.course.remove.queue";
public static final String EXCHANGE_COURSE = "edu.course.exchange";
public static final String KEY_SAVE = "edu.course.save.key";
public static final String KEY_REMOVE = "edu.course.remove.key";
//生成队列
@Bean
public Queue queueSave(){
return new Queue(QUEUE_SAVE);
}
@Bean
public Queue queueRemove(){
return new Queue(QUEUE_REMOVE);
}
//生成主题模式交换机
@Bean
public TopicExchange exchange(){
return new TopicExchange(EXCHANGE_COURSE);
}
//生成绑定
@Bean
public Binding bindingSave(){
return BindingBuilder.bind(queueSave()).to(exchange()).with(KEY_SAVE);
}
@Bean
public Binding bindingRemove(){
return BindingBuilder.bind(queueRemove()).to(exchange()).with(KEY_REMOVE);
}
}
?我们要实现数据的同步的话,整个流程大致是这个样的;
?所以我们在搜索服务中也需要加一个listener监听器:
package com.blb.wisdom_education_elasticsearch.listener;
import com.alibaba.fastjson.JSON;
import com.blb.common.entity.Course;
import com.blb.wisdom_education_elasticsearch.service.ICourseService;
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
/**
* 课程消息监听器
*/
@Slf4j
@Component
public class CourseMsgListener {
public static final String QUEUE_SAVE = "edu.course.save.queue";
public static final String QUEUE_REMOVE = "edu.course.remove.queue";
public static final String EXCHANGE_COURSE = "edu.course.exchange";
public static final String KEY_SAVE = "edu.course.save.key";
public static final String KEY_REMOVE = "edu.course.remove.key";
@Autowired
private ICourseService courseService;
//监听添加或更新操作的队列
@RabbitListener(bindings = @QueueBinding(value = @Queue(value = QUEUE_SAVE, durable = "true"),
exchange = @Exchange(value = EXCHANGE_COURSE, ignoreDeclarationExceptions = "true"),
key = KEY_SAVE))
public void receiveSaveMsg(String json, Message message, Channel channel) {
//收到同步服务发送的课程对象
Course course = JSON.parseObject(json, Course.class);
courseService.saveOrUpdate(course);
log.info("接收课程数据:{}", course);
}
//监听删除操作的队列
@RabbitListener(bindings = @QueueBinding(value = @Queue(value = QUEUE_REMOVE, durable = "true"),
exchange = @Exchange(value = EXCHANGE_COURSE, ignoreDeclarationExceptions = "true"),
key = KEY_REMOVE))
public void receiveRemoveMsg(String id, Message message, Channel channel) {
courseService.removeById(id);
log.info("接收删除id:{}", id);
}
}
然后是数据同步服务的监听器:
package com.blb.wisdom_education_datasync.listener;
import com.alibaba.fastjson.JSON;
import com.alibaba.otter.canal.protocol.CanalEntry;
import com.blb.common.entity.Course;
import com.blb.wisdom_education_datasync.config.RabbitMQConfig;
import com.blb.wisdom_education_datasync.feign.CourseFeignClient;
import com.xpand.starter.canal.annotation.CanalEventListener;
import com.xpand.starter.canal.annotation.ListenPoint;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import java.util.List;
/**
* 监听课程表的监听器
*/
@CanalEventListener
public class CourseCanalListener {
@Autowired
private CourseFeignClient courseFeignClient;
@Autowired
private RabbitTemplate rabbitTemplate;
@ListenPoint(schema = "wisdom_education_course",table = "course")
public void courseChange(CanalEntry.EventType eventType, CanalEntry.RowData rowData){
System.out.println("1111");
//获得操作的类型
String type = eventType.toString();
if("INSERT".equals(type) || "UPDATE".equals(type)){
List<CanalEntry.Column> list = rowData.getAfterColumnsList();
//获得课程id
String id = null;
for(CanalEntry.Column column : list){
if("id".equals(column.getName())){
id = column.getValue();
break;
}
}
//调用课程服务查询课程信息
Course course = courseFeignClient.getCourseById(Long.valueOf(id));
//发送消息给搜索服务
System.out.println(type+":"+course);
rabbitTemplate.convertAndSend(RabbitMQConfig.EXCHANGE_COURSE,RabbitMQConfig.KEY_SAVE, JSON.toJSONString(course));
}else if("DELETE".equals(type)){
List<CanalEntry.Column> list = rowData.getBeforeColumnsList();
//获得课程id
String id = null;
for(CanalEntry.Column column : list){
if("id".equals(column.getName())){
id = column.getValue();
break;
}
}
//发送消息给搜索服务
rabbitTemplate.convertAndSend(RabbitMQConfig.EXCHANGE_COURSE,RabbitMQConfig.KEY_REMOVE, id);
}
}
}
至此加上前端的代码就可以完成一个Elasticsearch+RabbitMQ+Canal的一个数据过滤和数据同步的效果了。运行有bug的话,希望自己加油d,RabbiotMQ消息保证问题久不说了添加一些小配置即可。
完结,谢谢。
都看到这儿了不给个赞或者订阅嘛??
|