大家好,欢迎来到IT知识分享网。
我使用的ElasticSearch是2.3.3版本,同步数据库使用插件是elasticsearch-jdbc-2.3.3.0,这里请注意,针对ElasticSearch版本需要使用对应的插件.下面是插件下载地址.
https://github.com/jprante/elasticsearch-jdbc
环境变量配置
安装好ElasticSearch后,将下载的插件放到你喜欢的任意盘中(不需要安装).
然后配置环境变量,在系统变量中新增一个变量,名字自定义(路径指向你的插件地址)
这个插件有点坑的地方就是你同步数据库的时候需要使用jdk1.8,然后同步完后可以将环境变量改成你原来的jdk版本。但是不能卸载,不然同步就会停止。
脚本文件编写
安装完JDK1.8后在任意盘创建一个文件夹,创建一个.sh后缀文件,同步执行的就是这个shell脚本文件。内容如下(贴上的代码显示不正常,就使用图片了):
图片橘黄色部分是你新增环境变量名字。最后一行的statefile.json文件新增在这个执行文件同一路径下。代码内容如下:
{
“type” : “jdbc”,
“jdbc”: {
“schedule”: “0/20 0-59 0-23 ? * *”,
“elasticsearch.autodiscover”:true,
“elasticsearch.cluster”:”elasticsearch”,
“driver”:”com.mysql.jdbc.Driver”,
“url”:”jdbc:mysql:localhost:3306/topic_test2”,
“user”:”“,
“password”:”“,
“sql”:”select v.id as _id,v.id as id,v.name as name,unix_timestamp(v.createTime) as createTime from videoInfo v”,
“elasticsearch” : {
“host” : “10.0.1.2”,
“port” : 9300
},
“index” : “original”,
“type” : “original”
}
}
其中schedule指的是同步刷新的时间间隔,可以指定多长时间同步一次。有关schedule的参数配置可以点击下面路径查看:
http://www.cnblogs.com/lihaiming93/p/6619124.html
elasticsearch.cluster 这个会自动查找你的elasticsearch是否有集群,我做测试是没有搭集群,可以在你安装的elasticsearch2.3.3/config/elasticsearch.yml文件配置,修改这个文件,找到cluster.name,去掉前面的#号,将值改为elasticsearch。
这里建议在写sql时把ES上的 _id与数据库id对应。这样方便搜索对应的数据。
然后填写你的数据库基本信息,我是用的mysql这里写sql语句的时间类型是个坑,最好可以转成时间戳,这样方便搜索是对时间排序。当时我需要同步的表设计时时间类型定为了varchar,同步到ES中就认定成了字符串(这里可以测试一下如果数据库是正常时间类型,同步过来是否可以正常排序)。
下面有关ES的配置就可以按照你自己的需求来填写。index和type填写后会自动为你创建,不需要你主动创建。到这里基本上已经成功配置完了。
脚本文件运行
在window下运行shell文件需要下载Git Bash。下载完成后运行.sh文件。运行后会在.sh文件路径下产生一个日志文件,如果失败可以在日志中查看问题。
到这里基本已经完成了对数据库的同步,这里要记住,运行.sh文件的窗口不能关闭,关闭就停止同步了(关闭后重新执行这个文件就可以恢复同步)。
然后这个插件坑点还有不能同步删除数据库信息(数据库删除后,ES上不能删除),能同步增改。
删除ES上冗余数据(java代码实现)
由于ES不能同步删除这个坑,我自己写了一个工具方法()用来删除ES上存在但数据库中却删除了的冗余数据(可以自己弄个定时器什么的每天删除)。
首先如果你的同步的数据库数据量比较大的话,在java中,如果使用set.size()方法的话,每次最多能搜索出1万条数据,这样对较大数据量时就显得不够用。所以在这里使用的是滚动搜索。代码如下:
这个方法是设置每次滚动搜索的数量,然后需要对id进行排序好与数据库中数据做判断。
public Map<String,Object> searchByScroll() {
Map<String,Object> map = new HashMap<String,Object>();
List<Integer> list = new ArrayList<Integer>();
String index = "original";
String type = "original";
// 搜索条件
SearchRequestBuilder searchRequestBuilder = client.prepareSearch();
searchRequestBuilder.setIndices(index);
searchRequestBuilder.setTypes(type);
searchRequestBuilder.addSort("id", SortOrder.ASC);
searchRequestBuilder.setSize(1000);//设置每次滚动搜索数量
searchRequestBuilder.setScroll(new TimeValue(30000));//设置滚动搜索有效时长
// 执行
SearchResponse searchResponse = searchRequestBuilder.get();
String scrollId = searchResponse.getScrollId();
SearchHit[] searchHits = searchResponse.getHits().getHits();
for (SearchHit searchHit : searchHits) {
list.add(Integer.valueOf(searchHit.getSource().get("id") + ""));
}
map.put("list", list);
map.put("scrollId", scrollId);
return map;
}
根据上面方法传过来的scrollId来获取所有的数据
public List<Integer> searchByScrollId(Map<String,Object> map) {
List<Integer> list = (List<Integer>) map.get("list");
String scrollId = (String) map.get("scrollId");
TimeValue timeValue = new TimeValue(10000);
SearchScrollRequestBuilder searchScrollRequestBuilder;
SearchResponse response;
// 结果
while (true) {
searchScrollRequestBuilder = client.prepareSearchScroll(scrollId);
// 重新设定滚动时间
searchScrollRequestBuilder.setScroll(timeValue);
// 请求
response = searchScrollRequestBuilder.get();
// 每次返回下一个批次结果 直到没有结果返回时停止 即hits数组空时
if (response.getHits().getHits().length == 0) {
break;
} // if
// 这一批次结果
SearchHit[] searchHits = response.getHits().getHits();
System.out.println(searchHits.length);
for (SearchHit searchHit : searchHits) {
list.add(Integer.valueOf(searchHit.getSource().get("id") + ""));
} // for
// 只有最近的滚动ID才能被使用
scrollId = response.getScrollId();
} // while
clearScroll(scrollId);
return list;
}
这个方法就是同步删除的工具方法,需要ES和数据库数据id排序一致,原理是id循环比较,id相同就继续,不同就跳过。
public boolean dropCommon() throws UnknownHostException{
ESUtil es = new ESUtil();
List<String> original = new Infomation().originalInfo();//获取数据库所有数据id,需要根据id排序
List<Integer> error = new ArrayList<Integer>();
int size = 0;
while(size==0){
int index = 0;
int original_index = 0;
List<Integer> all = es.searchByScrollId(es.searchByScroll());//获取ES所有数据id,需要根据id排序
for(int i=0;i<all.size();i++){
index++;
if(all.get(i)==Integer.parseInt(original.get(original_index))){
original_index++;
}else{
error.add(all.get(i));//这里就是冗余数据id
}
}
System.out.println(error.size());
break;
}
return false;
}
文章就到这里了,希望能对想要了解这方面的有帮助。
参考文献http://blog.csdn.net/laoyang360/article/details/51694519
免责声明:本站所有文章内容,图片,视频等均是来源于用户投稿和互联网及文摘转载整编而成,不代表本站观点,不承担相关法律责任。其著作权各归其原作者或其出版社所有。如发现本站有涉嫌抄袭侵权/违法违规的内容,侵犯到您的权益,请在线联系站长,一经查实,本站将立刻删除。 本文来自网络,若有侵权,请联系删除,如若转载,请注明出处:https://yundeesoft.com/14925.html