中间件包括ShardingSphere分库分表的封装、Redis的封装、Kafka的封装、Elastic-Search的封装、db2es-admin的封装、 db2es-client等的封装等。
database_0数据库容纳数据表table_0, table_1, …, table_15
database_1数据库容纳数据表table_16, table_17, …, table_31
database_2数据库容纳数据表table_32, table_33, …, table_47
database_3数据库容纳数据表table_48, table_49, …, table_63
id : 主键, BIGINT UNSIGNED类型 (分布式主键, 例如:雪花算法)
row_create_time : 记录创建时间, datetime(3), 必须设置默认值: CURRENT_TIMESTAMP, 业务代码不可修改该字段
row_update_time : 记录最后一次修改时间, datetime(3), 必须勾选根据当前时间戳更新, 业务代码不可修改该字段
<dependency>
<groupId>org.wyyt</groupId>
<artifactId>elasticsearch-starter</artifactId>
<version>${lastest_version}</version>
</dependency>
sharding:
# 是否开启ShardingSphere数据源
enabled: true
# 分布式集群编号id, 不能重复(取值范围0~1023)
work-id: 1
# 是否输出执行的sql(true:打印; false:不打印)
show-sql: true
# ACM配置信息
acm:
datasource:
data-id: scfs.xml.datasource.encrypt
group: SIJIBAO_ORDER_CENTER_GROUP
dimension:
data-id: scfs.xml.dimension
group: SIJIBAO_ORDER_CENTER_GROUP
table:
data-id: scfs.xml.table
group: SIJIBAO_ORDER_CENTER_GROUP
acmConfigPath: acmConfig.properties
nacosLocalSnapshotPath: /wyyt/etc/acm/sql_tool
nacosLogPath: /wyyt/logs/tomcat/sql_tool/
其中, scfs.xml.datasource.encrypt数据源配置信息如下:
<?xml version="1.0" encoding="UTF-8"?>
<datasources>
<!-- name: 数据库的逻辑名称. 必填项, 必须唯一 -->
<!-- index: 数据库的索引(分库时使用), 从0开始, 默认为0 -->
<datasource name="finance_center_main_0" index="0">
<!-- 数据库IP地址. 必填项 -->
<host>192.168.0.197</host>
<!-- 数据库端口. 必填项 -->
<port>3306</port>
<!-- 数据库的真实物理名称. 必填项 -->
<databaseName>finance_center_main_0</databaseName>
<!-- 数据库的账号. 必填项 -->
<username>root</username>
<!-- 数据库的密码. 必填项 -->
<password>EqkPepuq0FN49w=</password>
<!-- 配置连接池中最小可用连接的个数 -->
<minIdle>10</minIdle>
<!-- 配置连接池中最大可用连接的个数 -->
<maxActive>20</maxActive>
</datasource>
<datasource name="finance_center_main_1" index="1">
<host>192.168.0.197</host>
<port>3306</port>
<databaseName>finance_center_main_1</databaseName>
<username>root</username>
<password>EqkPepuq0FNoCe49w=</password>
<minIdle>10</minIdle>
<maxActive>20</maxActive>
</datasource>
<!--******当SQL所涉及的数据表在以上数据源中查询不到时, 会自动去isDefault=true(该属性默认为false)的数据源中寻找, 最多只能拥有一个isDefault=true的数据源******-->
<datasource name="finance_other" isDefault="true">
<host>192.168.5.110</host>
<port>3306</port>
<databaseName>finance_dev</databaseName>
<username>fin</username>
<password>TdAvSNMlMQhNY2MG9pzKY=</password>
<minIdle>10</minIdle>
<maxActive>20</maxActive>
</datasource>
</datasources>
scfs.xml.dimension维度配置信息如下:
<?xml version="1.0" encoding="UTF-8"?>
<dimensions>
<!--
name: 维度名称,必须唯一。不允许为空
priority: 当多个拆分键在同一条SQL中出现时,维度的优先级,数值越低,优先级越高。 不允许为空。
当priority="0"时,优先级最高,被视为是主维度,多个维度之间只能有一个主维度
description: 当前维度的描述信息,不允许为空
-->
<dimension name="order-no" priority="0" description="订单维度">
<!-- ref: 数据库的逻辑名称。不允许为空 -->
<datasource ref="finance_center_main_0"/>
<datasource ref="finance_center_main_1"/>
</dimension>
</dimensions>
scfs.xml.table数据表配置信息如下:
<?xml version="1.0" encoding="UTF-8"?>
<tables>
<!--
name: 数据表的逻辑名称,必须唯一。不允许为空
pkName: 主键。 可以为空,为空默认为id
rowCreateTime: 记录创建时间字段(时间精确到毫秒),为空默认为row_create_time
rowUpdateTime: 记录最后一次修改时间字段(时间精确到毫秒),为空默认为row_update_time
bindingName: 具有相同绑定名称的表为一组绑定表, 为空表示不和任何表组成绑定表
broadcast: 是否是广播表(true: 是广播表; false: 不是)。为空表示false
-->
<table name="fin_pay_fund_flow_out_fund" pkName="id">
<!--
ref: 维度信息xml配置中的维度名称name
tableCountNum: 逻辑表在该维度下的分表总个数, 默认为1
shardingColumn: 逻辑表在该维度下的拆分键字段, 默认为id
tableNameFormat: 逻辑表与物理表之间的映射关系表达式, 为空默认是{逻辑名称} (也可以是: {逻辑名称}_%s, 其中, %s为下标索引, 从0开始到{tableCountNum-1})
-->
<dimension ref="order-no" tableCountNum="64" shardingColumn="order_no"/>
</table>
<table name="fin_sjb_order_out_fund" pkName="id">
<dimension ref="order-no" tableCountNum="64" shardingColumn="order_no"/>
</table>
<table name="fin_sjb_order" pkName="id">
<dimension ref="order-no" tableCountNum="64" shardingColumn="sjb_stock_no"/>
</table>
<table name="fin_sjb_order_sub_line" pkName="id">
<dimension ref="order-no" tableCountNum="64" shardingColumn="sjb_stock_no"/>
</table>
<table name="fin_sjb_order_feerate_content" pkName="id">
<dimension ref="order-no" tableCountNum="64" shardingColumn="sjb_stock_no"/>
</table>
<table name="fin_payment_days_info" pkName="id">
<dimension ref="order-no" tableCountNum="64" shardingColumn="sjb_stock_no"/>
</table>
<table name="fin_sjb_order_pay_line" pkName="id">
<dimension ref="order-no" tableCountNum="64" shardingColumn="sjb_stock_no"/>
</table>
<table name="fin_pay_fund_flow_detail" pkName="id">
<dimension ref="order-no" tableCountNum="64" shardingColumn="stock_no"/>
</table>
<table name="fin_external_capital_change_wide" pkName="id">
<dimension ref="order-no" tableCountNum="64" shardingColumn="order_no"/>
</table>
<table name="fin_ac_out_fund_chg" pkName="id">
<dimension ref="order-no" tableCountNum="64" shardingColumn="trade_no"/>
</table>
</tables>
@Configuration
public class DataSourceConfig {
@Autowired
private ShardingDataSource shardingDataSource;
@Bean
public DataSource dataSource() {
return this.shardingDataSource;
}
}
acm: data-id: scfs.tool group: SIJIBAO_ORDER_CENTER_GROUP acmConfigPath: acmConfig.properties nacosLocalSnapshotPath: /wyyt/etc/acm/db2es/ nacosLogPath: /wyyt/logs/sql_tool/其中, scfs.tool配置如下:sharding: enabled: true work-id: 300 show-sql: false
acm: datasource: data-id: scfs.xml.datasource.encrypt group: SIJIBAO_ORDER_CENTER_GROUP dimension: data-id: scfs.xml.dimension group: SIJIBAO_ORDER_CENTER_GROUP table: data-id: scfs.xml.table group: SIJIBAO_ORDER_CENTER_GROUP acmConfigPath: acmConfig.properties nacosLocalSnapshotPath: /wyyt/etc/acm/sql_tool nacosLogPath: /wyyt/logs/tomcat/springcloud/sql_tool/
#sql tool工具端口 sql.tool.port=10086 #sql tool数据库配置 db.host=192.168.0.197 db.port=3306 db.username=root encrypt.db.password=Xzl9H5z0zWOGu5nh= db.dbName=scfs_sql_developer
<dependency>
<groupId>org.wyyt</groupId>
<artifactId>redis-starter</artifactId>
<version>${lastest_version}</version>
</dependency>
spring:
redis:
host: 192.168.6.167
port: 6379
password: ********
timeout: 2000
database: 0
jedis:
pool:
max-idle: 1000
max-wait: -1
min-idle: 0
@Autowired
private RedisService redisService;
//读写
public void setAndGet() {
this.redisService.set(KEY, System.currentTimeMillis());
Assert.notNull(this.redisService.get(KEY), "set & get 失败");
}
//分布式锁 public void lock() { try (RedisService.Lock lock = this.redisService.getLock(KEY, 10000L, 6000L)) { if (
lock.hasLock()) { System.out.println("拿到锁了: " + lock.lockKey() + " " + lock.requestId()); } else { System.err.println("
没有拿到锁"); } } Assert.isNull(this.redisService.get(KEY), "lock失败"); }
<dependency>
<groupId>org.wyyt</groupId>
<artifactId>kafka-starter</artifactId>
<version>${lastest_version}</version>
</dependency>
spring:
kafka:
bootstrap-servers: 192.168.6.164:9092,192.168.6.165:9092,192.168.6.166:9092
listener:
missing-topics-fatal: false
producer:
retries: 3
batch-size: 1024
buffer-memory: 33554432
acks: all
compression-type: lz4
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
@Autowired
private KafkaTest kafkaTest;
//同步发送
public void send() throws Exception {
this.kafkaService.send(TOPIC_NAME, "KEY", String.valueOf(System.currentTimeMillis()));
}
//同步发送(带事务,当方法体失败时,该消息不会被消费)
@TranKafka public void sendTran() throws Exception { this.kafkaService.send(TOPIC_NAME, "KEY", String.valueOf(
System.currentTimeMillis())); }
//异步发送(带事务)
@TranKafka public void sendTranAsync() { this.kafkaService.sendAsync(TOPIC_NAME, "KEY", String.valueOf(
System.currentTimeMillis()), (sendResult, throwable) -> { log.info(sendResult.toString()); Assert.isTrue(false, "
回调方法中的异常是不会回滚的"); }); Assert.isTrue(false, "能够正常回滚"); }
server.port = 9999 zookeeper.servers = 192.168.6.166:2181,192.168.6.167:2181,192.168.0.197:2181 db.host = 192.168.0.197 db.port = 3306 db.name = kafka_monitor db.username = root db.password = XXXXX retention.days = 3 topic.blacklist =
<dependency>
<groupId>org.wyyt</groupId>
<artifactId>elaticsearch-starter</artifactId>
<version>${lastest_version}</version>
</dependency>
elasticsearch: enabled: true hostnames: 192.168.6.165:9900,192.168.6.166:9900,192.168.6.167:9900 username: elastic password: ****** max-conn-total: 100 max-conn-per-route: 20
@Autowired private ElasticSearchService elasticSearchService;//根据主键查询 public void getById() throws Exception { String response = this.elasticSearchService.getById(INDEX_NAME, PRIMARY_KEY_VALUE, String.class); System.out.println(response); }
//条件查询 public void test06_search() throws Exception { SearchRequest searchRequest = new SearchRequest(INDEX_NAME); BoolQueryBuilder boolQueryBuilder = new BoolQueryBuilder(); boolQueryBuilder.must(QueryBuilders.rangeQuery("id").gte(1) .lte(20)); //范围查询。must相当于SQL where字句中的AND; should则相当于OR boolQueryBuilder.must(QueryBuilders.matchQuery("remark", " 颚ABCDEFGHIJKLMNOPQRSTUVWXYZ_1234567890987654321")); //match查询 SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder(); searchSourceBuilder.query(boolQueryBuilder); searchSourceBuilder.from(0); //获取的起始位置,可用以分页 searchSourceBuilder.size(10);//获取的document记录数,可用于分页 searchSourceBuilder.sort("row_create_time", SortOrder.ASC); //排序 searchSourceBuilder.fetchSource(new String[]{"id", "name", "remark"}, new String[]{}); searchRequest.source( searchSourceBuilder); List response = this.elasticSearchService.select(searchRequest, String.class); for (String s : response) { System.out.println(s); } }
//分页查询 public void page() throws IOException { SearchRequest searchRequest = new SearchRequest(INDEX_NAME); BoolQueryBuilder boolQueryBuilder = new BoolQueryBuilder(); boolQueryBuilder.must(QueryBuilders.matchPhraseQuery("id", " 1")); //match查询 SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder(); searchSourceBuilder.query( boolQueryBuilder); searchRequest.source(searchSourceBuilder); IPage page = this.elasticSearchService.page( searchRequest, TestEntity.class, new Page<>(1, 10)); System.out.println(page.getRecords()); }
## kafka集群所使用的zookeeper集群地址, 多个用逗号隔开 zookeeper.servers=192.168.6.166:2181,192.168.6.167:2181 ## ## 目标ElasticSearch的地址, 多个用逗号隔开 elasticsearch.hostnames=192.168.6.165:9900,192.168.6.166:9900,192.168.6.167:9900 ## ElasticSearch的用户名 elasticsearch.username=finance ## ElasticSearch的密码 encrypt.elasticsearch.password=AQZRHONdKs= ## ## db2es数据库的地址 db.host=192.168.0.197 ## db2es数据库的端口 db.port=3306 ## db2es数据库的库名 db.databaseName=scfs_db2es ## db2es数据库的用户名 db.username=root ## db2es数据库的密码 encrypt.db.password=APgXwToHDGFNOz0=