唐抉的个人博客

实现Postgresql与Elasticsearch的数据同步

字数统计: 1.8k阅读时长: 8 min
2023/01/17

遇到一个针对业务数据实现全文搜素的需求,而数据存在于pgsql数据库中,因此要将数据库中的数据同步到ES里。有两种方式可以实现:一是通过ES的API进行增删改查,二是通过中间件来进行数据全量、增量的同步。由于调用API的方式比较麻烦,本文便为第二种实现方式。

刚开始时是准备使用multicorn插件的,奈何在windows 10的环境下一直都无法进行make&&make install安装,而由于环境问题也无法使用ABC工具(不支持postgresql14),随即作罢,改换成实时数据采集引擎logstash来同步数据。

logstash概要

Logstash作为Elasicsearch常用的实时数据采集引擎,可以采集来自不同数据源的数据,并对数据进行处理后输出到多种输出源,是Elastic Stack 的重要组成部分。官网网址为:Logstash

其数据处理的过程包括Inputs、Filters、Outputs三部分,在Inputs和Outputs中可以使用Codecs对数据格式进行处理。这四个部分均是以插件形式存在,用户通过定义pipeline配置文件,设置需要使用的input、filter、output、codec插件以实现特定的数据采集、数据处理、数据输出等功能。

  • Inputs:用于从数据源获取数据,常见的插件如filesyslogredisbeats
  • Filters:用于处理数据如格式转换、数据派生等,常见的插件如grokmutatedropclonegeoip
  • Outputs:用于数据输出,常见的插件如elasticsearchfilegraphitestatsd
  • CodecsCodecs不是一个单独的流程,而是在输入和输出等插件中用于数据转换的模块,用于对数据进行编码处理,常见的插件如jsonmultiline

网页中下载好对应版本的可运行文件并解压,将解压目录增加到环境变量中,即可在cmd命令行里运行。

安装配置

本文环境要求

系统版本:window10

JDK版本:1.8.0_351(要求jdk版本在1.8以上)

Elasticsearch版本:5.5.3

Postgresql版本:14.5

安装--单表同步配置

  • 安装logstash:5.5.3(ELK版本要求保持一致),下载网址:logstash 5.5.3

  • 将安装包解压后,进入解压目录logstash-5.5.3\bin后,新建pgsql文件夹(名字随意)

  • 下载连接postgreSQL数据库的pg引擎,即与JDK版本对应的jdbc驱动:Java 8-42.5.1,解压后放到pgsql文件夹中

  • 在pgsql文件夹中新建文件logstash-pgsql.conf,内容如下:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    input {
    stdin {
    }
    jdbc {
    # 数据库连接地址
    jdbc_connection_string => "jdbc:postgresql://127.0.0.1:5432/postgres"
    # 数据库连接账号密码:
    jdbc_user => "***"
    jdbc_password => "******"
    # Pg依赖包路径
    jdbc_driver_library => "pgsql\postgresql-42.5.1.jar"
    jdbc_driver_class => "org.postgresql.Driver"
    # 开启分页查询,默认为false
    jdbc_paging_enabled => "true"
    # 单次分页查询条数(默认100000,若字段较多且更新频率较高,则建议调低此值)
    jdbc_page_size => "300000"
    use_column_value => "true"
    tracking_column => "id" #根据id字段更新
    # statement为查询数据sql,若sql较复杂,建议通过statement_filepath 配置sql文件的存放路径;
    # statement => "SELECT * FROM public.表名"
    statement_filepath => "pgsql\logstash-pgsql.sql" #这是要操作的sql 表,下面会提供
    # 同步频率(分时天月年),默认为每分钟同步一次
    schedule => "* * * * *"
    type => "jdbc"
    jdbc_default_timezone =>"Asia/Shanghai"
    }
    }
    filter {
    json {
    source => "message"
    remove_field => ["message"]
    }
    }
    output {
    elasticsearch {
    # 配置es集群的地址
    hosts => ["localhost:9200"]
    # 索引名称,必须小写
    index => "test"
    # 模板地址
    template => "pgsql\es-template.json" #es 索引模板,可不定义
    template_name => "t-statistic-out-logstash"
    template_overwrite => true
    document_type => "out"
    # 数据唯一索引
    document_id => "%{id}"
    }
    stdout {
    codec => json_lines
    }
    }
  • 在pgsql文件夹中新建文件logstash-pgsql.sql,内容如下:

    1
    2
    SELECT * FROM public."表名"
    ORDER BY id ASC

  • 在pgsql文件夹中新建文件es-template.json,内容如下:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    {
    "template" : "t-statistis-out-template",
    "order":1,
    "settings": {
    "index": {
    "refresh_interval": "5s"
    }
    },
    "mappings": {
    "_default_": {
    "_all" : {"enabled":false},
    "dynamic_templates": [
    {
    "message_field" : {
    "match" : "message",
    "match_mapping_type" : "string",
    "mapping" : { "type" : "string", "index" : "not_analyzed" }
    }
    }, {
    "string_fields" : {
    "match" : "*",
    "match_mapping_type" : "string",
    "mapping" : { "type" : "string", "index" : "not_analyzed" }
    }
    }
    ],
    "properties": { //将id、name、tag更换为自己表中的表头,其中keyword为Elasticsearch中用于索引结构化内容的字段
    "id": {
    "type": "integer"
    },
    "name": {
    "type": "keyword"
    },
    "tag": {
    "type": "keyword"
    }
    }
    }
    },
    "aliases": {}
    }

  • 打开cmd,将目录切换到logstash-5.5.3\bin,输入命令启动logstash:

    1
    logstash -f ./pgsql/logstash-pgsql.conf

  • 成功启动的截图如下:

安装--多表同步配置

多表配置和单表配置的区别在于input模块的jdbc模块有几个type,output模块就需对应有几个type:

  • logstash-pgsql.conf文件的内容如下:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    75
    76
    77
    78
    input {
    stdin {
    }
    jdbc {
    # 数据库连接地址
    jdbc_connection_string => "jdbc:postgresql://127.0.0.1:5432/postgres"
    # 数据库连接账号密码:
    jdbc_user => "***"
    jdbc_password => "******"
    # Pg依赖包路径
    jdbc_driver_library => "pgsql\postgresql-42.5.1.jar"
    jdbc_driver_class => "org.postgresql.Driver"
    # 开启分页查询,默认为false
    jdbc_paging_enabled => "true"
    # 单次分页查询条数(默认100000,若字段较多且更新频率较高,则建议调低此值)
    jdbc_page_size => "300000"
    use_column_value => "true"
    tracking_column => "id" #根据id字段更新
    # statement为查询数据sql,若sql较复杂,建议通过statement_filepath 配置sql文件的存放路径;
    # statement => "SELECT * FROM public.test1"
    statement_filepath => "pgsql\logstash-pgsql1.sql" #这是要操作的sql 表,下面会提供
    # 同步频率(分时天月年),默认为每分钟同步一次
    schedule => "* * * * *"
    # 多表同步时,表类型区分,建议命名为“库名_表名”,每个jdbc模块需对应一个type;
    type => "postgres_test1"
    jdbc_default_timezone =>"Asia/Shanghai"
    }
    jdbc {
    #其他配置此处省略,详情参考前一个jdbc的配置
    # ...

    # 多表同步时,表类型区分,建议命名为“库名_表名”,每个jdbc模块需对应一个type;
    type => "postgres_test2"
    jdbc_default_timezone =>"Asia/Shanghai"
    }
    }
    filter {
    json {
    source => "message"
    remove_field => ["message"]
    }
    }
    output {
    # output模块的type需和jdbc模块的type一致
    if [type] == "postgres_test1"{
    elasticsearch {
    # 配置es集群的地址
    hosts => ["localhost:9200"]
    # 索引名称,必须小写
    index => "test"
    # 模板地址
    template => "pgsql\es-template.json" #es 索引模板,可不定义
    template_name => "t-statistic-out-logstash"
    template_overwrite => true
    document_type => "out"
    # 数据唯一索引
    document_id => "%{id}"
    }
    }
    if [type] == "postgres_test2"{
    elasticsearch {
    # 配置es集群的地址
    hosts => ["localhost:9200"]
    # 索引名称,必须小写
    index => "test"
    # 模板地址
    template => "pgsql\es-template.json" #es 索引模板,可不定义
    template_name => "t-statistic-out-logstash"
    template_overwrite => true
    document_type => "out"
    # 数据唯一索引
    document_id => "%{id}"
    }
    }
    stdout {
    codec => json_lines
    }
    }

CATALOG
  1. 1. logstash概要
  2. 2. 安装配置
    1. 2.1. 本文环境要求
    2. 2.2. 安装--单表同步配置
    3. 2.3. 安装--多表同步配置