Mysql数据库binlog系列四-Mysql到ES的同步

如果我们的应用数据是按照某个唯一唯独进行分布式存储,而实际查询的角度有多个时,一般有两种选择的方案,一是按不同角度存储关系表,通过关系表进行二次查询,二是根据不同的查询角度进行全量的荣誉存储;

这两种方案可以从逻辑上解决问题,但是也会带来延伸的问题,如何处理冗余数据的存储又需要我们进行考虑,一般来说从多角度进行查询无法保证数据分布均匀,比如满足用户查询A系统自己的交易记录,需要以用户的角度进行冗余,而热点用户的存在会影响到某个库或表,反过来也会有库和表因为用户记录太少而白白浪费了资源。

借助全文搜索引擎,我们可以从根本上解决这个问题,ES完全可以满足性能要求和存储量的要求,我们在数据冷热分区的基础上添加了半热的概念,用ES查询满足一年半内数据的部分查询需求,目前选择通过ES来存储关联关系,以屏蔽产品更替频繁带来的字段更替问题。

缺陷:ES同步是基于sql的同步,虽然我们在项目中已经预留了updateTime字段,但是在后续开发中维护的程度堪忧,新项目、Mybatis项目修改起来还容易,hibernate、JPA用户还是需要仔细分析现有代码不要遗漏

Master配置

创建用于同步的用户
1
CREATE USER 'username' IDENTIFIED BY '123456';
赋予同步用户查询权限
1
GRANT SELECT ON *.* TO 'username'@'%';

LogStash配置

只讨论配置,搭建部分请回到主页搜关键字ELK

input配置
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
input{
jdbc {
clean_run => false
columns_charset => {“column0”=>“UTF-8”}
connection_retry_attempts => 2
connection_retry_attempts_wait_time => 0.5
jdbc_connection_string =>“jdbc:mysql:// localhost:3306 / mydb”
jdbc_default_timezone => "Asia/Shanghai"
jdbc_driver_class =>“com.mysql.jdbc.Driver”
jdbc_driver_library =>“mysql-connector-java-5.1.36-bin.jar”
jdbc_fetch_size => 10000
jdbc_page_size => 100000
jdbc-jdbc_paging_enabled => true
jdbc_password => "123456"
jdbc_password_filepath => /usr/password
jdbc_pool_timeout => 5
jdbc_user =>“mysql”
jdbc_validate_connection => true
jdbc_validation_timeout => 3600
last_run_metadata_path => "$HOME/.logstash_jdbc_last_run"
lowercase_column_names => true
parameters => {“id”=>“ABCDE”}
record_last_run => true
schedule => “* * * * *”
sequel_opts => {“login_timeout”=>“5”}
sql_log_level => “INFO”
statement =>“SELECT * from dual where id =:id”
tracking_column => “id”
tracking_column_type => “timestamp”
use_column_value => true
}
}
  • clean_run:是否应保留之前的运行状态,默认false
  • columns_charset:指定编码类型
  • connection_retry_attempts:尝试连接数据库的最大次数
  • connection_retry_attempts_wait_time:连接尝试之间休眠的秒数
  • jdbc_connection_string:链接信息,必选
  • jdbc_default_timezone:时区转换,将SQL时间戳转换为Logstash时间戳*
  • jdbc_driver_class:驱动类,必选(oracle填Java::oracle.jdbc.driver.OracleDriver)
  • jdbc_driver_library:插件和JDBC驱动没有直接打包,需要在此配置,可以通过逗号进行分隔,配置多个
  • jdbc_fetch_size:一次获取数据的大小,如果不指定,则默认使用JDBC驱动的默认大小
  • jdbc_page_size:JDBCpage的大小,默认100000
  • jdbc_paging_enabled:SQl语句会被分解成多次查询,每次查询使用limits和offsets组成完整的结果集,现在大小为jdbc_page_size
  • jdbc_password:密码
  • jdbc_password_filepath:密码文件名
  • jdbc_pool_timeout:连接池获取连接时抛出超时异常的最大值
  • jdbc_user:链接用户,必选
  • jdbc_validate_connection:连接池链接前先进行验证链接
  • jdbc_validation_timeout:验证链接的频率,默认3600
  • last_run_metadata_path:上次运行时的文件路径
  • lowercase_column_names:是否强制标识符字段小写
  • parameters:查询参数设置
  • record_last_run:是否保存状态
  • schedule:使用cron语法描述定时计划,结果集中的每一行都会成为一个事件,列将转换为事件中的字段,没有指定计划则只运行一次
  • sequel_opts:针对数据库服务的参数配置
  • sql_log_level:记录SQL查询的日志级别(fatal,error,warn,info,debug)
  • statement:配置SQL语句,可以使用命名参数语法
  • statement_filepath:从文件中配置SQL语句,该选项只能支持一个SQL(多个SQL需要配置多个logstash配置文件)
  • tracking_column:需要追踪的列
  • tracking_column_type:跟踪列的类型(numeric,timestamp)
  • use_column_value:为true时使用tracking_column列作为sql_last_value,为false时使用上次执行查询的时间

    通用LogStash配置
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    input{
    jdbc {
    add_field => {“id”=>“ABCDE”}
    codec => "json"
    enable_metric => true
    id => “MySQL_DB_ID”
    tags => ["TAG_DB","TAG_DATA"]
    type => "TYPE_DB"
    }
    }
  • add_field:向事件中添加字段

  • codec:解码器,指定数据格式
  • enable_metric:是否启用metric日志
  • id:设置唯一的标识,如果不设置则默认生成
  • tags:设置标签,便于管理和分类
  • type:用于过滤器的激活
    输出配置
    1
    2
    3
    4
    5
    6
    7
    8
    9
    output{
    elashticsearch{
    hosts =>["XX.XX.XX.XX:9200"]
    codec => "json"
    action => "index"
    index => "INDEX_NAME"
    document_id => "test"
    }
    }
内置参数
  • sql_last_value:用于计算要查询的行。在运行任何查询之前,设置为1970年1月1日(星期四),如果use_column_value为true并设置tracking_column,则设置为0。
数据库驱动安装

ES的jdbc插件需要手动指定驱动,从下面这个网站上可以删掉后半部分的url,选取自己想要的版本进行下载

1
wget "https://dev.mysql.com/get/Downloads/Connector-J/mysql-connector-java-5.1.47.tar.gz" --no-check-certificate

ES jdbc同步插件

这是一个小坑,目前常用的版本已经将该插件作为内置插件,无需进行安装,在网上寻找了很久匹配版本发现都找不到,最后发现已经集成了这个插件