1.ik 热词及近义词 远程字典的获取方式
简单看下源码,这里需要注意的 1.每次轮询校验的时候设置了请求头 “If-Modified-Since”,“If-None-Match” 2.用 “Etag”和 “Last-Modified” 来确定文件是否发生变化 3.词库有更新的时候调用了 Dictionary.getSingleton().reLoadMainDict(); , reLoadMainDict里调用了 loadRemoteExtDict() 来加载远程字典 然后 getRemoteWords 和 getRemoteWordsUnprivileged 来获取词条,获取词条的请求头并没有加上面两个属性
package org.wltea.analyzer.dic;
public class Monitor implements Runnable {
.....
....
public void runUnprivileged() {
RequestConfig rc = RequestConfig.custom().setConnectionRequestTimeout(10*1000)
.setConnectTimeout(10*1000).setSocketTimeout(15*1000).build();
HttpHead head = new HttpHead(location);
head.setConfig(rc);
if (last_modified != null) {
head.setHeader("If-Modified-Since", last_modified);
}
if (eTags != null) {
head.setHeader("If-None-Match", eTags);
}
CloseableHttpResponse response = null;
try {
response = httpclient.execute(head);
if(response.getStatusLine().getStatusCode()==200){
if (((response.getLastHeader("Last-Modified")!=null) && !response.getLastHeader("Last-Modified").getValue().equalsIgnoreCase(last_modified))
||((response.getLastHeader("ETag")!=null) && !response.getLastHeader("ETag").getValue().equalsIgnoreCase(eTags))) {
Dictionary.getSingleton().reLoadMainDict();
last_modified = response.getLastHeader("Last-Modified")==null?null:response.getLastHeader("Last-Modified").getValue();
eTags = response.getLastHeader("ETag")==null?null:response.getLastHeader("ETag").getValue();
}
}else if (response.getStatusLine().getStatusCode()==304) {
}else{
logger.info("remote_ext_dict {} return bad code {}" , location , response.getStatusLine().getStatusCode() );
}
} catch (Exception e) {
logger.error("remote_ext_dict {} error!",e , location);
}finally{
try {
if (response != null) {
response.close();
}
} catch (IOException e) {
logger.error(e.getMessage(), e);
}
}
}
}
....
....
public class Dictionary {
...
...
void reLoadMainDict() {
logger.info("start to reload ik dict.");
Dictionary tmpDict = new Dictionary(configuration);
tmpDict.configuration = getSingleton().configuration;
tmpDict.loadMainDict();
tmpDict.loadStopWordDict();
_MainDict = tmpDict._MainDict;
_StopWords = tmpDict._StopWords;
logger.info("reload ik dict finished.");
}
private void loadMainDict() {
_MainDict = new DictSegment((char) 0);
Path file = PathUtils.get(getDictRoot(), Dictionary.PATH_DIC_MAIN);
loadDictFile(_MainDict, file, false, "Main Dict");
this.loadExtDict();
this.loadRemoteExtDict();
}
private void loadRemoteExtDict() {
List<String> remoteExtDictFiles = getRemoteExtDictionarys();
for (String location : remoteExtDictFiles) {
logger.info("[Dict Loading] " + location);
List<String> lists = getRemoteWords(location);
if (lists == null) {
logger.error("[Dict Loading] " + location + " load failed");
continue;
}
for (String theWord : lists) {
if (theWord != null && !"".equals(theWord.trim())) {
logger.info(theWord);
_MainDict.fillSegment(theWord.trim().toLowerCase().toCharArray());
}
}
}
}
private static List<String> getRemoteWords(String location) {
SpecialPermission.check();
return AccessController.doPrivileged((PrivilegedAction<List<String>>) () -> {
return getRemoteWordsUnprivileged(location);
});
}
}
private static List<String> getRemoteWordsUnprivileged(String location) {
List<String> buffer = new ArrayList<String>();
RequestConfig rc = RequestConfig.custom().setConnectionRequestTimeout(10 * 1000).setConnectTimeout(10 * 1000)
.setSocketTimeout(60 * 1000).build();
CloseableHttpClient httpclient = HttpClients.createDefault();
CloseableHttpResponse response;
BufferedReader in;
HttpGet get = new HttpGet(location);
get.setConfig(rc);
try {
response = httpclient.execute(get);
if (response.getStatusLine().getStatusCode() == 200) {
String charset = "UTF-8";
HttpEntity entity = response.getEntity();
if(entity!=null){
Header contentType = entity.getContentType();
if(contentType!=null&&contentType.getValue()!=null){
String typeValue = contentType.getValue();
if(typeValue!=null&&typeValue.contains("charset=")){
charset = typeValue.substring(typeValue.lastIndexOf("=") + 1);
}
}
if (entity.getContentLength() > 0 || entity.isChunked()) {
in = new BufferedReader(new InputStreamReader(entity.getContent(), charset));
String line;
while ((line = in.readLine()) != null) {
buffer.add(line);
}
in.close();
response.close();
return buffer;
}
}
}
response.close();
} catch (IllegalStateException | IOException e) {
logger.error("getRemoteWords {} error", e, location);
}
return buffer;
}
......
近义词也是差不多的就不多看了,简单贴一点 git 地址 https://github.com/bells/elasticsearch-analysis-dynamic-synonym 不过 这里需要配一下analysis
"analysis": {
"analyzer": {
"my_ik_max_word": {
"tokenizer": "ik_max_word",
"filter": ["remote_synonym"]
}
},
"filter": {
"remote_synonym": {
"type": "dynamic_synonym",
"synonyms_path": "http://xxxx/${type}/remote_dic.txt",
"interval": 30
}
},
}
public class Monitor implements Runnable {
private SynonymFile synonymFile;
Monitor(SynonymFile synonymFile) {
this.synonymFile = synonymFile;
}
@Override
public void run() {
if (synonymFile.isNeedReloadSynonymMap()) {
synonymMap = synonymFile.reloadSynonymMap();
for (AbsSynonymFilter dynamicSynonymFilter : dynamicSynonymFilters.keySet()) {
dynamicSynonymFilter.update(synonymMap);
logger.debug("success reload synonym");
}
}
}
}
public class RemoteSynonymFile implements SynonymFile {
...
...
@Override
public boolean isNeedReloadSynonymMap() {
RequestConfig rc = RequestConfig.custom()
.setConnectionRequestTimeout(10 * 1000)
.setConnectTimeout(10 * 1000).setSocketTimeout(15 * 1000)
.build();
HttpHead head = AccessController.doPrivileged((PrivilegedAction<HttpHead>) () -> new HttpHead(location));
head.setConfig(rc);
if (lastModified != null) {
head.setHeader("If-Modified-Since", lastModified);
}
if (eTags != null) {
head.setHeader("If-None-Match", eTags);
}
CloseableHttpResponse response = null;
try {
response = executeHttpRequest(head);
if (response.getStatusLine().getStatusCode() == 200) {
if (!response.getLastHeader(LAST_MODIFIED_HEADER).getValue()
.equalsIgnoreCase(lastModified)
|| !response.getLastHeader(ETAG_HEADER).getValue()
.equalsIgnoreCase(eTags)) {
lastModified = response.getLastHeader(LAST_MODIFIED_HEADER) == null ? null
: response.getLastHeader(LAST_MODIFIED_HEADER)
.getValue();
eTags = response.getLastHeader(ETAG_HEADER) == null ? null
: response.getLastHeader(ETAG_HEADER).getValue();
return true;
}
} else if (response.getStatusLine().getStatusCode() == 304) {
return false;
} else {
logger.info("remote synonym {} return bad code {}", location,
response.getStatusLine().getStatusCode());
}
} finally {
try {
if (response != null) {
response.close();
}
} catch (IOException e) {
logger.error("failed to close http response", e);
}
}
return false;
}
...
...
public Reader getReader() {
Reader reader;
RequestConfig rc = RequestConfig.custom()
.setConnectionRequestTimeout(10 * 1000)
.setConnectTimeout(10 * 1000).setSocketTimeout(60 * 1000)
.build();
CloseableHttpResponse response = null;
BufferedReader br = null;
HttpGet get = new HttpGet(location);
get.setConfig(rc);
try {
response = executeHttpRequest(get);
if (response.getStatusLine().getStatusCode() == 200) {
String charset = "UTF-8";
if (response.getEntity().getContentType().getValue()
.contains("charset=")) {
String contentType = response.getEntity().getContentType()
.getValue();
charset = contentType.substring(contentType
.lastIndexOf('=') + 1);
}
br = new BufferedReader(new InputStreamReader(response
.getEntity().getContent(), charset));
StringBuilder sb = new StringBuilder();
String line;
while ((line = br.readLine()) != null) {
logger.debug("reload remote synonym: {}", line);
sb.append(line)
.append(System.getProperty("line.separator"));
}
reader = new StringReader(sb.toString());
} else reader = new StringReader("");
} catch (Exception e) {
logger.error("get remote synonym reader {} error!", location, e);
reader = new StringReader("");
} finally {
try {
if (br != null) {
br.close();
}
} catch (IOException e) {
logger.error("failed to close bufferedReader", e);
}
try {
if (response != null) {
response.close();
}
} catch (IOException e) {
logger.error("failed to close http response", e);
}
}
return reader;
}
}
2.实现
思路挺简单的 就是存一个最后修改时间 最后修改时间变了 证明新增了 存一个重构分词时间, 最后修改时间大于重构分词时间 ,就需要重构下分词
@Data
@TableName("ext_dict")
public class ExtDict extends BaseEntity {
@TableId(value = "id", type = IdType.AUTO)
private Integer id;
@NotNull(message = "热词不能为空", groups = { AddGroup.class})
private String word;
private Integer type;
private String synonym;
}
@GetMapping("/{type}/remote_dic.txt")
public void getRemotDic(@PathVariable("type") int type,HttpServletRequest request,HttpServletResponse response) {
ArrayList<String> headerNames = Collections.list(request.getHeaderNames());
response.setContentType("text/plain");
response.setCharacterEncoding("utf-8");
String lastModified = RedisUtils.getCacheObject(REMOTE_DIC_LAST_MODIFY+ type +":").toString();
if(StringUtils.isEmpty(lastModified)){
lastModified=extDictService.queryLastModified(type)+"";
RedisUtils.setCacheObject(REMOTE_DIC_LAST_MODIFY+type+":",lastModified);
}
response.setHeader("ETag","xxxxxxxxxxxxxxxxdsa");
response.setDateHeader("Last-Modified",Long.valueOf(lastModified));
if(headerNames.contains("If-None-Match") || headerNames.contains("If-Modified-Since")){
return;
}
List<ExtDict> list = extDictService.queryListByType(type);
PrintWriter writer=null;
try {
writer = response.getWriter();
if (type==1){
list = list.stream().map(exdict -> {
exdict.setWord(exdict.getWord() + "=>" + exdict.getSynonym());
return exdict;
}).collect(Collectors.toList());
}
for (int i = 0; i < list.size(); i++) {
writer.write(list.get(i).getWord()+"\n");
}
writer.flush();
} catch (IOException e) {
e.printStackTrace();
}finally {
if (writer!=null){
writer.close();
}
}
String status = RedisUtils.getCacheObject(REBUILD_ANALYSIS_STATUS);
String time = RedisUtils.getCacheObject(REBUILD_ANALYSIS_TIME);
if(status==null){
RedisUtils.setCacheObject(REBUILD_ANALYSIS_STATUS,ANALYSIS_STATUS_SUCCESS);
RedisUtils.setCacheObject(REBUILD_ANALYSIS_TIME,System.currentTimeMillis()+"");
return;
}
if(!ANALYSIS_STATUS_UPDATING.equals(status) && StringUtils.compare(lastModified,time)>0){
elasticSearchService.rebuildAnalysis();
}
}
@Override
public void rebuildAnalysis() {
UpdateByQueryRequest request =new UpdateByQueryRequest(DEFULT_INDEX_NAME);
request.setConflicts("proceed");
request.setQuery(QueryBuilders.matchAllQuery());
request.setRefresh(true);
restHighLevelClient.updateByQueryAsync(request, RequestOptions.DEFAULT, new ActionListener<BulkByScrollResponse>() {
@Override
public void onResponse(BulkByScrollResponse bulkByScrollResponse) {
log.info("------------------重建分词成功");
RedisUtils.setCacheObject(REBUILD_ANALYSIS_STATUS,ANALYSIS_STATUS_SUCCESS);
RedisUtils.setCacheObject(REBUILD_ANALYSIS_TIME,System.currentTimeMillis()+"");
}
@Override
public void onFailure(Exception e) {
log.error("----------- -----重建分词失败",e);
RedisUtils.setCacheObject(REBUILD_ANALYSIS_STATUS,ANALYSIS_STATUS_FAILED);
}
});
RedisUtils.setCacheObject(REBUILD_ANALYSIS_STATUS,ANALYSIS_STATUS_UPDATING);
}
|