1、基本需求
1、 项目中我们希望 能够按照时间、类别来添加表。但是sharding-jdbc 是固定配置 的 actual-data-nodes 参数。也就是说我们需要提前创建好分表或者分库。那么我们需要如何来实现动态创建表,并且动态刷新 actual-data-nodes 呢。
2、思路就是写个定时器来动态创建表 ,在创建表的时候 动态刷新 actual-data-nodes 实现动态创建表被shard加载。
2、功能实现
2.1、添加依赖
1 2 3 4 5 6 7 8 9 10
| <dependency> <groupId>org.apache.shardingsphere</groupId> <artifactId>sharding-jdbc-spring-boot-starter</artifactId> <version>4.0.0-RC1</version> </dependency> <dependency> <groupId>org.apache.shardingsphere</groupId> <artifactId>sharding-jdbc-spring-namespace</artifactId> <version>4.0.0-RC1</version> </dependency>
|
2.2、yml配置
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
| sharding: binding-tables[0]: ims_test_result,ims_test_sample_fetch,ims_test_sample_diluent,ims_test_reagent_add,ims_test_ls_add,ims_test_incubate,ims_test_read tables: ims_sample_base: actual-data-nodes: m1.sharding_data_nodes_2022 key-generator: column: id type: SNOWFLAKE props: worker.id: ${workerId} table-strategy: complex: sharding-columns: submit_work_time,sample_uid algorithm-class-name: com.chivd.common.algorithm.TableShardingSampleAlgorithm
|
2.3、具体分片算法
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57
| public class TableShardingSampleAlgorithm implements ComplexKeysShardingAlgorithm<String> { private static final String COLUMN_SAMPLE_UID = "sample_uid"; private static final String COLUMN_SUBMIT_TIME = "submit_work_time"; @Override public Collection<String> doSharding(Collection<String> collection, ComplexKeysShardingValue<String> complexKeysShardingValue) { String logicTableName = complexKeysShardingValue.getLogicTableName(); Collection<String> submitWorkTimeCollection = complexKeysShardingValue.getColumnNameAndShardingValuesMap().getOrDefault(COLUMN_SUBMIT_TIME, new ArrayList<>()); ArrayList<String> submitWorkTimeList = new ArrayList<>(submitWorkTimeCollection); if (CollectionUtils.isNotEmpty(submitWorkTimeList)) { Set<String> set = new HashSet<>(); for (int i = 0; i < submitWorkTimeList.size(); i++) { Date date = DateUtils.parseDate(submitWorkTimeList.get(i)); StringBuffer tableName = new StringBuffer(); tableName.append(logicTableName).append("_").append(DateUtils.parseDateToStr(DateUtils.YYYYMM, date)); set.add(tableName.toString()); } return set; } Range<String> submitWorkTimeRange = complexKeysShardingValue.getColumnNameAndRangeValuesMap().get(COLUMN_SUBMIT_TIME); if (submitWorkTimeRange != null) { Set<String> result = new HashSet<>(); Date lowerDate = DateUtils.parseDate(submitWorkTimeRange.lowerEndpoint()); Date upperDate = DateUtils.parseDate(submitWorkTimeRange.upperEndpoint()); int monthSpace = DateUtils.getMonthSpace(lowerDate, upperDate); for (int i = 0; i < monthSpace; i++) { result.add(logicTableName + "_" + DateUtils.parseDateToStr(DateUtils.YYYYMM, lowerDate)); lowerDate = DateUtils.addMonths(lowerDate, 1); } return result; } Collection<String> testUids = complexKeysShardingValue.getColumnNameAndShardingValuesMap().getOrDefault(COLUMN_SAMPLE_UID, new ArrayList<>()); if (CollectionUtils.isNotEmpty(testUids)) { return testUids.stream().map(uid -> { String[] split = uid.split("-"); StringBuffer tableName = new StringBuffer(); tableName.append(logicTableName) .append("_") .append(split[split.length - 1].substring(0,6)); return tableName.toString(); }).collect(Collectors.toSet()); } return null; } }
|
2.4、创建配置表
配置表包含所有需要分表的逻辑表表名,分表开始年月等信息
1 2 3 4 5 6 7 8
| CREATE TABLE `ims_sharding_config` ( `id` int(11) NOT NULL AUTO_INCREMENT, `table_name` varchar(100) DEFAULT NULL COMMENT '表名', `start_year_month` varchar(20) DEFAULT NULL COMMENT '分表开始年月', `comment` varchar(100) DEFAULT NULL COMMENT '备注', `is_deleted` tinyint(4) DEFAULT '0' COMMENT '(0-未删除 1-删除)', PRIMARY KEY (`id`) ) ENGINE=InnoDB AUTO_INCREMENT=21 DEFAULT CHARSET=utf8mb4;
|
2.5、创建定时任务
- 启动时自动刷新actual-data-nodes节点
- 自动创建下月分表
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140
| @Slf4j @Component public class ShardingTableUtils { @Autowired DataSource dataSource; @Autowired IShardingConfigService shardingConfigService;
@PostConstruct public void startRefresh() { AutoCreateTable(); }
public void actualTablesRefresh() { log.info("-------------- 开始刷新sharding配置 ---------------"); try { List<ShardingConfig> shardingConfigList = shardingConfigService.listShardingConfig(); ShardingDataSource dataSource = (ShardingDataSource) this.dataSource; if (shardingConfigList == null || shardingConfigList.isEmpty()) { log.info("【sharding自动配置】配置表为空"); return; } String curYearAndMonth = DateUtils.getYearAndMonth(DateUtils.monthAdd(new Date(),1).getTime()); Field modifiersField = Field.class.getDeclaredField("modifiers"); modifiersField.setAccessible(true); for (ShardingConfig item : shardingConfigList) { TableRule tableRule = null; tableRule = dataSource.getRuntimeContext().getRule().getTableRule(item.getTableName()); List<DataNode> dataNodes = tableRule.getActualDataNodes(); String dataSourceName = dataNodes.get(0).getDataSourceName(); List<String> monthBetween = getMonthBetween(item.getStartYearMonth(), curYearAndMonth); List<DataNode> newDataNodes = monthBetween.stream() .map(yearMonth -> new DataNode(dataSourceName + "." + item.getTableName() + "_" + yearMonth)).collect(Collectors.toList()); Field actualDataNodesField = TableRule.class.getDeclaredField("actualDataNodes"); actualDataNodesField.setAccessible(true); modifiersField.setInt(actualDataNodesField, actualDataNodesField.getModifiers() & ~Modifier.FINAL); actualDataNodesField.set(tableRule, newDataNodes); Set<String> actualTables = Sets.newHashSet(); Map<DataNode, Integer> dataNodeIndexMap = Maps.newHashMap(); AtomicInteger index = new AtomicInteger(0); newDataNodes.forEach(dataNode -> { actualTables.add(dataNode.getTableName()); if (index.intValue() == 0) { dataNodeIndexMap.put(dataNode, 0); } else { dataNodeIndexMap.put(dataNode, index.intValue()); } index.incrementAndGet(); }); Field actualTablesField = TableRule.class.getDeclaredField("actualTables"); actualTablesField.setAccessible(true); actualTablesField.set(tableRule, actualTables); dataNodeIndexMapField.setAccessible(true); dataNodeIndexMapField.set(tableRule, dataNodeIndexMap); datasourceToTablesMap.put(dataSourceName, actualTables); Field datasourceToTablesMapField = TableRule.class.getDeclaredField("datasourceToTablesMap"); datasourceToTablesMapField.setAccessible(true); datasourceToTablesMapField.set(tableRule, datasourceToTablesMap); } } catch (Exception e) { e.printStackTrace(); log.info("【sharding自动配置】异常" + e.getMessage()); } }
private void AutoCreateTable() { log.info("-------------- 开始创建分表 ---------------"); List<ShardingConfig> shardingConfigList = shardingConfigService.listShardingConfig(); if (shardingConfigList == null || shardingConfigList.isEmpty()) { log.info("【sharding自动配置】配置表为空"); return; } String curYearAndMonth = DateUtils.getYearAndMonth(DateUtils.monthAdd(new Date(),2).getTime()); for (ShardingConfig item : shardingConfigList) { List<String> monthBetween = new ArrayList<>(); try { monthBetween = getMonthBetween(item.getStartYearMonth(), curYearAndMonth); } catch (ParseException e) { log.info("【sharding自动配置】日期转化失败" + e.getMessage()); e.printStackTrace(); } monthBetween.forEach(yearMonth -> { try { shardingConfigService.createTable(item.getTableName(), item.getTableName() + "_" + yearMonth); }catch (Exception ignored){} }); } actualTablesRefresh(); }
private static List<String> getMonthBetween(String minDate, String maxDate) throws ParseException { ArrayList<String> result = new ArrayList<>(); SimpleDateFormat sdf = new SimpleDateFormat("yyyyMM"); Calendar min = Calendar.getInstance(); Calendar max = Calendar.getInstance(); min.setTime(sdf.parse(minDate)); min.set(min.get(Calendar.YEAR), min.get(Calendar.MONTH), 1); max.setTime(sdf.parse(maxDate)); max.set(max.get(Calendar.YEAR), max.get(Calendar.MONTH), 2); Calendar curr = min; while (curr.before(max)) { result.add(sdf.format(curr.getTime())); curr.add(Calendar.MONTH, 1); } return result; }
@Scheduled(cron = "0 0 1 27 * ?") public void refreshScheduled() { AutoCreateTable(); } }
|