ES和LogStash结合导入mysql数据

我们安装和启动好ES,学会了ES的用法后,现在要应用到我们实际的项目中,我们如何将现有的数据在ES中创建索引呢?下面将介绍两种方式,通过ES的api直接索引和通过LogStash组件进行索引。

通过ES的API进行索引

本示例我们采用的Java语言开发,使用maven构建的工程,我们先把api相关依赖加入pom.xml

1
2
3
4
5
6
7
8
9
10
<dependency>
<groupId>org.elasticsearch.client</groupId>
<artifactId>elasticsearch-rest-high-level-client</artifactId>
<version>7.12.1</version>
</dependency>
<dependency>
<groupId>org.elasticsearch</groupId>
<artifactId>elasticsearch</artifactId>
<version>7.12.1</version>
</dependency>

创建索引

1
2
3
4
5
6
7
8
9
10
11
12
13
14
// 创建高版本客户端
RestHighLevelClient client = new RestHighLevelClient(RestClient.builder(new HttpHost("localhost", 9200, "http")));
// 创建索引对象
IndexRequest indexRequest = new IndexRequest("posts")
.index("db_test")
.type("user")
.id("5")
.source("user", "kimchy",
"postDate", null,
"message", "trying out Elasticsearch");
// 调用客户端创建索引
IndexResponse index = client.index(indexRequest, RequestOptions.DEFAULT);
// 关闭索引
client.close();

通过API创建索引的逻辑比较简单,关于查询索引,删除索引,更新索引请参考官方文档:https://www.elastic.co/guide/en/elasticsearch/client/java-rest/current/index.html

LogStash导入数据

下载安装LogStash

下面我们先下载安装LogStash,下载地址:https://www.elastic.co/cn/downloads/logstash

1
2
wget https://artifacts.elastic.co/downloads/logstash/logstash-7.12.1-linux-x86_64.tar.gz
tail -zxvf logstash-7.12.1-linux-x86_64.tar.gz

运行测试

1
2
cd logstash-7.12.1
./bin/logstash -e 'input { stdin { } } output { stdout {} }'

运行完成后,我们控制台输入hello logstash。

1
2
3
4
5
6
7
8
The stdin plugin is now waiting for input:
hello logstash
{
"@version" => "1",
"@timestamp" => 2020-05-14T03:56:07.035Z,
"message" => "hello logstash",
"host" => "apple"
}

上面执行过程就是:输入(hello logstash)》通过logstash管道经过多个过滤器加工数据》输出结果。

导入mysql数据

为了方便演示,我创建了一个tb_article表,并初始化了一些数据:

1
2
3
4
5
6
7
8
9
10
create table tb_article(
id int(11) auto_increment primary key ,
article varchar(128) comment '文章',
tilte varchar(64) comment '标题',
author varchar(16) comment '作者'
);

insert into tb_article(article, tilte, author) VALUES ('所有人都要得死,所有人都要奉承。这是出自《权利的游戏》电影中人们常说的一句话。','名言','张三'),
('现在写在代码就是为了将来可以可写代码。','哲理','李四'),
('当发生雪崩时,没有一片雪花是无辜的;这个观点的角度应该是上帝视角。','哲理','张三');

导入mysql的数据前,我们要准备mysql驱动,存放位置要写到下面的配置中

编写logstash数据处理配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
# 创建一个conf文件,将下面配置写入
vi config/mysql.conf

input {
stdin {}
jdbc {
# mysql 数据库链接,shop为数据库名
jdbc_connection_string => "jdbc:mysql://172.21.24.215:3306/article"
# 用户名和密码
jdbc_user => "root"
jdbc_password => "root"
# 驱动
jdbc_driver_library => "/root/Downloads/logstash-7.12.1/lib/mlib/mysql-connector-java-8.0.24/mysql-connector-java-8.0.24.jar"
# 驱动类名
jdbc_driver_class => "com.mysql.cj.jdbc.Driver"
jdbc_paging_enabled => "true"
jdbc_page_size => "100"
# 执行的sql 文件路径+名称
#statement_filepath => "/root/Downloads/logstash-7.12.1/config/sql/tbinfo.sql"
statement => "select * from tb_article"
# 设置监听间隔 各字段含义(由左至右)分、时、天、月、年,全部为*默认含义为每分钟都更新
schedule => "* * * * *"
# 索引类型
type => "article"
}
}
filter {
json {
source => "message"
remove_field => ["message"]
}
}
output {
if [type]=="article"{
elasticsearch {
hosts => ["localhost:9200"]
index => "article"
document_id => "%{id}"
}
}
stdout {
codec => "json_lines"
}
}

启动logstash

1
./bin/logstash -f config/mysql.conf

日志会输出导入ES的json数据。

1
2
3
4
5
6
{"@version":"1","type":"article","@timestamp":"2020-05-14T07:32:08.869Z","id":"1","article":"所有人都要得死,所有人都要奉承。这是出自《权利的游戏》电影中人们常说的一句话。","title":"名言","author":"张三"}

{"@version":"1","type":"article","@timestamp":"2020-05-14T07:32:08.869Z","id":"1","article":"现在写在代码就是为了将来可以可写代码。","title":"哲理","author":"李四"}

{"@version":"1","type":"article","@timestamp":"2020-05-14T07:32:08.869Z","id":"1","article":"当发生雪崩时,没有一片雪花是无辜的;这个观点的角度应该是上帝视角。","title":"哲理","author":"张三"}

到此我们已经将mysql的数据导入了ES中。

通过binlog进行同步

参考文章:https://cloud.tencent.com/document/product/845/35562

怎么快速把mysql的大数据量导入ES?

线上有个场景,就是有两千多万的一个订单数据,通过上述的mysql导入太慢,每秒导入100条数据,大约需要56小时,耗时主要是jdbc的IO和ES的写入数据IO。
优化方案是减少网络IO:

  1. 先将数据库数据导出cvs文件中,避免jdbc的IO。
  2. 批量写入ES,减少IO次数。

导出mysql数据:

1
2
mysql> SELECT * FROM tb_article 
-> INTO OUTFILE 'tb_article.cvs';

增加logstash的cvs导入配置:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
input {
file {
path => ["/Users/atomic/Desktop/tb_article.csv"]
start_position => "beginning",
type => "article"
}
}
filter {
csv{
separator => ","
columns => ["id","article","title","author"]
skip_empty_columns => true
remove_field => ["host", "tags", "path", "message"]
}
mutate{
convert => {
"id" => "integer"
"article" => "string"
"title" => "string"
"author" => "string"
}
}
}
output {
if [type]=="article"{
elasticsearch {
hosts => ["localhost:9200"]
index => "article"
document_id => "%{id}"
}
}
stdout {
codec => "json_lines"
}
}

执行导入:

1
./bin/logstash -f config/input/tb_artical.conf

此时,每秒写入约1W数据,处理两千万数据只需要半个小时,在可接受范围内。

参考文章