遇到一个针对业务数据实现全文搜素的需求,而数据存在于pgsql数据库中,因此要将数据库中的数据同步到ES里。有两种方式可以实现:一是通过ES的API进行增删改查,二是通过中间件来进行数据全量、增量的同步。由于调用API的方式比较麻烦,本文便为第二种实现方式。
刚开始时是准备使用multicorn
插件的,奈何在windows 10的环境下一直都无法进行make&&make install
安装,而由于环境问题也无法使用ABC工具(不支持postgresql14),随即作罢,改换成实时数据采集引擎logstash
来同步数据。
logstash
概要
Logstash
作为Elasicsearch常用的实时数据采集引擎,可以采集来自不同数据源的数据,并对数据进行处理后输出到多种输出源,是Elastic Stack 的重要组成部分。官网网址为:Logstash
其数据处理的过程包括Inputs、Filters、Outputs三部分,在Inputs和Outputs中可以使用Codecs对数据格式进行处理。这四个部分均是以插件形式存在,用户通过定义pipeline配置文件,设置需要使用的input、filter、output、codec插件以实现特定的数据采集、数据处理、数据输出等功能。
- Inputs:用于从数据源获取数据,常见的插件如
file
、syslog
、redis
、beats
等 - Filters:用于处理数据如格式转换、数据派生等,常见的插件如
grok
、mutate
、drop
、clone
、geoip
等 - Outputs:用于数据输出,常见的插件如
elasticsearch
、file
、graphite
、statsd
等 Codecs
:Codecs
不是一个单独的流程,而是在输入和输出等插件中用于数据转换的模块,用于对数据进行编码处理,常见的插件如json
、multiline
等
在网页中下载好对应版本的可运行文件并解压,将解压目录增加到环境变量中,即可在cmd命令行里运行。
安装配置
本文环境要求
系统版本:window10
JDK版本:1.8.0_351(要求jdk版本在1.8以上)
Elasticsearch版本:5.5.3
Postgresql版本:14.5
安装--单表同步配置
安装logstash:5.5.3(ELK版本要求保持一致),下载网址:logstash 5.5.3
将安装包解压后,进入解压目录
logstash-5.5.3\bin
后,新建pgsql文件夹(名字随意)下载连接postgreSQL数据库的pg引擎,即与JDK版本对应的jdbc驱动:Java 8-42.5.1,解压后放到pgsql文件夹中
在pgsql文件夹中新建文件
logstash-pgsql.conf
,内容如下:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51input {
stdin {
}
jdbc {
# 数据库连接地址
jdbc_connection_string => "jdbc:postgresql://127.0.0.1:5432/postgres"
# 数据库连接账号密码:
jdbc_user => "***"
jdbc_password => "******"
# Pg依赖包路径
jdbc_driver_library => "pgsql\postgresql-42.5.1.jar"
jdbc_driver_class => "org.postgresql.Driver"
# 开启分页查询,默认为false
jdbc_paging_enabled => "true"
# 单次分页查询条数(默认100000,若字段较多且更新频率较高,则建议调低此值)
jdbc_page_size => "300000"
use_column_value => "true"
tracking_column => "id" #根据id字段更新
# statement为查询数据sql,若sql较复杂,建议通过statement_filepath 配置sql文件的存放路径;
# statement => "SELECT * FROM public.表名"
statement_filepath => "pgsql\logstash-pgsql.sql" #这是要操作的sql 表,下面会提供
# 同步频率(分时天月年),默认为每分钟同步一次
schedule => "* * * * *"
type => "jdbc"
jdbc_default_timezone =>"Asia/Shanghai"
}
}
filter {
json {
source => "message"
remove_field => ["message"]
}
}
output {
elasticsearch {
# 配置es集群的地址
hosts => ["localhost:9200"]
# 索引名称,必须小写
index => "test"
# 模板地址
template => "pgsql\es-template.json" #es 索引模板,可不定义
template_name => "t-statistic-out-logstash"
template_overwrite => true
document_type => "out"
# 数据唯一索引
document_id => "%{id}"
}
stdout {
codec => json_lines
}
}在pgsql文件夹中新建文件
logstash-pgsql.sql
,内容如下:1
2SELECT * FROM public."表名"
ORDER BY id ASC在pgsql文件夹中新建文件
es-template.json
,内容如下:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41{
"template" : "t-statistis-out-template",
"order":1,
"settings": {
"index": {
"refresh_interval": "5s"
}
},
"mappings": {
"_default_": {
"_all" : {"enabled":false},
"dynamic_templates": [
{
"message_field" : {
"match" : "message",
"match_mapping_type" : "string",
"mapping" : { "type" : "string", "index" : "not_analyzed" }
}
}, {
"string_fields" : {
"match" : "*",
"match_mapping_type" : "string",
"mapping" : { "type" : "string", "index" : "not_analyzed" }
}
}
],
"properties": { //将id、name、tag更换为自己表中的表头,其中keyword为Elasticsearch中用于索引结构化内容的字段
"id": {
"type": "integer"
},
"name": {
"type": "keyword"
},
"tag": {
"type": "keyword"
}
}
}
},
"aliases": {}
}打开cmd,将目录切换到
logstash-5.5.3\bin
,输入命令启动logstash:1
logstash -f ./pgsql/logstash-pgsql.conf
成功启动的截图如下:
安装--多表同步配置
多表配置和单表配置的区别在于input模块的jdbc模块有几个type,output模块就需对应有几个type:
logstash-pgsql.conf
文件的内容如下:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78input {
stdin {
}
jdbc {
# 数据库连接地址
jdbc_connection_string => "jdbc:postgresql://127.0.0.1:5432/postgres"
# 数据库连接账号密码:
jdbc_user => "***"
jdbc_password => "******"
# Pg依赖包路径
jdbc_driver_library => "pgsql\postgresql-42.5.1.jar"
jdbc_driver_class => "org.postgresql.Driver"
# 开启分页查询,默认为false
jdbc_paging_enabled => "true"
# 单次分页查询条数(默认100000,若字段较多且更新频率较高,则建议调低此值)
jdbc_page_size => "300000"
use_column_value => "true"
tracking_column => "id" #根据id字段更新
# statement为查询数据sql,若sql较复杂,建议通过statement_filepath 配置sql文件的存放路径;
# statement => "SELECT * FROM public.test1"
statement_filepath => "pgsql\logstash-pgsql1.sql" #这是要操作的sql 表,下面会提供
# 同步频率(分时天月年),默认为每分钟同步一次
schedule => "* * * * *"
# 多表同步时,表类型区分,建议命名为“库名_表名”,每个jdbc模块需对应一个type;
type => "postgres_test1"
jdbc_default_timezone =>"Asia/Shanghai"
}
jdbc {
#其他配置此处省略,详情参考前一个jdbc的配置
# ...
# 多表同步时,表类型区分,建议命名为“库名_表名”,每个jdbc模块需对应一个type;
type => "postgres_test2"
jdbc_default_timezone =>"Asia/Shanghai"
}
}
filter {
json {
source => "message"
remove_field => ["message"]
}
}
output {
# output模块的type需和jdbc模块的type一致
if [type] == "postgres_test1"{
elasticsearch {
# 配置es集群的地址
hosts => ["localhost:9200"]
# 索引名称,必须小写
index => "test"
# 模板地址
template => "pgsql\es-template.json" #es 索引模板,可不定义
template_name => "t-statistic-out-logstash"
template_overwrite => true
document_type => "out"
# 数据唯一索引
document_id => "%{id}"
}
}
if [type] == "postgres_test2"{
elasticsearch {
# 配置es集群的地址
hosts => ["localhost:9200"]
# 索引名称,必须小写
index => "test"
# 模板地址
template => "pgsql\es-template.json" #es 索引模板,可不定义
template_name => "t-statistic-out-logstash"
template_overwrite => true
document_type => "out"
# 数据唯一索引
document_id => "%{id}"
}
}
stdout {
codec => json_lines
}
}