适用于
v3.3.8+
;
该方案是延时同步
,列表数据会根据logstash设置的数据同步时间
出现延时,即数据不会马上
在列表中展示;
软件及配置准备
- Elasticsearch
6.8.1
; - Logstash
6.8.1
; iform-logstash-client.jar
,用于接收IFORM
推送出来的logstash配置文件
并添加到logstash
的pipelines.yml
配置文件中;- RabbitMQ3.x,配置文件推送使用的是MQ广播,
暂时只支持RabbitMQ
,它与iform使用的是同一个MQ同一个用户同一个虚拟主机路径; - 数据库驱动文件,本文以
mysql
为例; - ik分词配置文件
logstash.ik.json
;
软件安装
Elasticsearch 安装
Logstash 安装
RabbitMQ 安装
iform-logstash-client.jar安装
如果logstash有多台服务器,那么需要修改源码工程
iform-provider-logstash-client
中com.ak.iform.logstash.client.rabbitmq.consumer.RabbitLogstashQueueConsumer
的注解@Queue
,广播消息需要同步配置到每一个logstash服务器中并保存对应的数据同步配置文件;
该程序是iform提供的一个客户端程序,直接打包源码工程
iform-provider-logstash-client
即可;配置参数(通过环境变量方式设置,以
windows
为例)- logstash管道配置文件路径,LOGSTASH_PIPELINES=D:\docker\logstash\conf.6.8.1\pipelines.yml
- logstash同步文件目录(即iform在数据模版功能中生成的logstash同步mysql数据至elasticsearch的配置文件存放的目录),LOGSTASH_PIPELINES_DIR=D:\docker\logstash\conf.6.8.1\logstash\mysql
- RabbitMQ主机IP,SPRING_RABBITMQ_HOST=192.168.3.220
- RabbitMQ用户名,SPRING_RABBITMQ_USERNAME=iformoptimize
- RabbitMQ密码,SPRING_RABBITMQ_PASSWORD=iformoptimize
- RabbitMQ虚拟主机路径,SPRING_RABBITMQ_VIRTUAL_HOST=/iformoptimize
客户端的启动脚本
start.bat
、start.sh
;start.bat
@echo off set LOGSTASH_PIPELINES=D:\\docker\\logstash\\conf.6.8.1\\pipelines.yml set LOGSTASH_PIPELINES_DIR=D:\\docker\\logstash\\conf.6.8.1\\logstash\\mysql set SPRING_RABBITMQ_HOST=192.168.3.118 set SPRING_RABBITMQ_USERNAME=iformoptimize set SPRING_RABBITMQ_PASSWORD=iformoptimize set SPRING_RABBITMQ_VIRTUAL_HOST=/iformoptimize java -Djava.awt.headless=true -Dfile.encoding=UTF-8 -Duser.timezone=GMT+8 -XX:SurvivorRatio=8 -Xms128m -Xmx128m -Xmn80m -jar iform-logstash-client.jar pause
start.sh
#!/bin/bash shome=$(cd `dirname $0`; pwd) # set variables export LOGSTASH_PIPELINES=/opt/docker/logstash-es/logstash_conf/pipelines.yml export LOGSTASH_PIPELINES_DIR=/opt/docker/logstash-es/logstash_conf/logstash/mysql export SPRING_RABBITMQ_HOST=192.168.3.220 export SPRING_RABBITMQ_USERNAME=iformoptimize export SPRING_RABBITMQ_PASSWORD=iformoptimize export SPRING_RABBITMQ_VIRTUAL_HOST=/iformoptimize # run client java -Djava.awt.headless=true -Dfile.encoding=UTF-8 -Duser.timezone=GMT+8 -XX:SurvivorRatio=8 -Xms128m -Xmx128m -Xmn80m -jar ${shome}/iform-logstash-client.jar >/dev/null 2>&1 &
IFORM启用Elasticsearch
- 启用全局配置
com.ak.db.elasticsearch.enabled:true
; - 启用业务列表的
ES检索
配置,以员工列表
为例- 在platform服务中添加配置项
elasticsearch.com.ak.iform.org.party.repository.impl.PartyEmployeeRepositoryImpl.query.enabled: true
; - 配置项规则
elasticsearch.[class full name].[method].enabled
; - 规则代码
com.ak.iform.base.framework.repository.IRepository.isElasticsearchOpenning(String)
- 在platform服务中添加配置项
- 编写json配置,配置es排序文件
文件存放在Po类同一个下
{
"query": {
"parameters": [
{
"key": "Q^user_id_^NE",
"value": "-1"
},
{
"key": "Q^status_^NE",
"value": "deleted"
}
],
"sorts": [
{
"field": "create_time_",
"order": "DESC"
}
]
}
}
- 手动创建logstash同步配置文件
input {
jdbc {
jdbc_driver_library => "/usr/share/logstash/pipeline/mysql-connector-java-5.1.36.jar"
jdbc_driver_class => "com.mysql.jdbc.Driver"
jdbc_connection_string => "jdbc:mysql://192.168.3.240:3306/iform_boot_v3_optimize_05?useUnicode=true&characterEncoding=utf-8&autoReconnect=true"
jdbc_user => "root"
jdbc_password => "root"
jdbc_paging_enabled => "true"
jdbc_page_size => "100000"
schedule => "* * * * *"
statement => "SELECT * FROM IFORM_PARTY_EMPLOYEE LEFT JOIN ( SELECT ID_ USER_ID_, ACCOUNT_, IS_SUPER_ FROM IBPS_PARTY_USER ) A ON USER_ID_ =
ID_ LEFT JOIN ( SELECT ID_ ORG_ID_, NAME_ ORG_NAME_ FROM IFORM_PARTY_ORG ) B ON ORG_ID_ = GROUP_ID_ WHERE update_time_ >= :sql_last_value order by upd
ate_time_ asc"
type => "jdbc"
use_column_value => true
tracking_column_type => "timestamp"
tracking_column => "update_time_"
record_last_run => true
lowercase_column_names => true
last_run_metadata_path => "/usr/share/logstash/data/sync_point_of_iform_party_employee"
}
}
filter {
ruby {
code => "event.set('timestamp', event.get('@timestamp').time.localtime + 8*60*60)"
}
ruby {
code => "event.set('@timestamp',event.get('timestamp'))"
}
mutate {
remove_field => ["timestamp"]
}
ruby {
code => "event.set('create_time_', event.get('create_time_').time.localtime + 8*60*60)"
}
ruby {
code => "event.set('update_time_', event.get('update_time_').time.localtime + 8*60*60)"
}
}
output {
elasticsearch {
hosts => ["192.168.3.220:9200"]
index => "iform_party_employee"
document_id => "%{id_}"
document_type => "iform_party_employee"
template_overwrite => true
template => "/usr/share/logstash/pipeline/logstash.ik.json"
}
stdout {
codec => json_lines
}
}
- 添加pipelines.yml文件引入
- pipeline.id: iform_party_employee
path.config: "/usr/share/logstash/pipeline/mysql/iform_party_employee.conf"
作者:hugh 创建时间:2024-01-03 10:32
最后编辑:hugh 更新时间:2024-11-15 11:25
最后编辑:hugh 更新时间:2024-11-15 11:25