diff --git a/pom.xml b/pom.xml index 38e25e6..33766b9 100644 --- a/pom.xml +++ b/pom.xml @@ -234,6 +234,7 @@ sync-generator sync-common sync-gather + sync-mq pom diff --git a/sync-common/src/main/java/com/lyr/common/annotation/Params.java b/sync-common/src/main/java/com/lyr/common/annotation/Params.java new file mode 100644 index 0000000..39551e1 --- /dev/null +++ b/sync-common/src/main/java/com/lyr/common/annotation/Params.java @@ -0,0 +1,25 @@ +package com.lyr.common.annotation; + +import java.lang.annotation.ElementType; +import java.lang.annotation.Retention; +import java.lang.annotation.RetentionPolicy; +import java.lang.annotation.Target; + + +@Target(ElementType.METHOD) +@Retention(RetentionPolicy.RUNTIME) +public @interface Params { + + Valid[] value(); + + + @Target(ElementType.ANNOTATION_TYPE) + @Retention(RetentionPolicy.RUNTIME) + @interface Valid { + + String value(); + + String message(); + } + +} diff --git a/sync-common/src/main/java/com/lyr/common/core/redis/RedisCache.java b/sync-common/src/main/java/com/lyr/common/core/redis/RedisCache.java index c3a3e75..3499cc3 100644 --- a/sync-common/src/main/java/com/lyr/common/core/redis/RedisCache.java +++ b/sync-common/src/main/java/com/lyr/common/core/redis/RedisCache.java @@ -1,5 +1,6 @@ package com.lyr.common.core.redis; +import com.alibaba.fastjson2.JSON; import com.alibaba.fastjson2.JSONObject; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.data.redis.core.BoundSetOperations; @@ -10,6 +11,7 @@ import org.springframework.stereotype.Component; import java.util.*; import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; /** * spring redis 工具类 @@ -177,7 +179,12 @@ public class RedisCache { * @param key * @return */ - public Set getCacheSet(final String key) { + public Set getCacheSet(final String key, Class... clazz) { + Set members = redisTemplate.opsForSet().members(key); + if (clazz.length > 0) { + Set collect = (Set) members.stream().map(o -> JSON.parseObject(JSON.toJSONString(o), clazz[0])).collect(Collectors.toSet()); + return collect; + } return redisTemplate.opsForSet().members(key); } diff --git a/sync-common/src/main/java/com/lyr/common/utils/bean/DataReconciliationService.java b/sync-common/src/main/java/com/lyr/common/utils/bean/DataReconciliationService.java new file mode 100644 index 0000000..46a512b --- /dev/null +++ b/sync-common/src/main/java/com/lyr/common/utils/bean/DataReconciliationService.java @@ -0,0 +1,139 @@ +package com.lyr.common.utils.bean; + +import lombok.extern.slf4j.Slf4j; +import org.apache.commons.lang3.ObjectUtils; + +import java.lang.reflect.Field; +import java.util.*; + +@Slf4j +public class DataReconciliationService { + + + // 存储对账结果 + private Map> reconciliationResult = new HashMap<>(); + + public DataReconciliationService() { + reconciliationResult.put("新增", new ArrayList<>()); + reconciliationResult.put("删除", new ArrayList<>()); + reconciliationResult.put("修改", new ArrayList<>()); + } + + /** + * 对账方法,比较两个数据源中的数据 + * + * @param sourceData 源数据列表 + * @param targetData 目标数据列表 + * @return 对账结果 + */ + public Map> reconcileData(List sourceData, List targetData) { + if (sourceData == null || targetData == null) { + throw new IllegalArgumentException("Source and target data cannot be null"); + } + if (sourceData.isEmpty() || targetData.isEmpty()) { + return reconciliationResult; + } + + Map sourceMap = null; + Map targetMap = null; + for (Field field : sourceData.get(0).getClass().getDeclaredFields()) { + String idFieldName = field.getName(); + sourceMap = toMap(sourceData, idFieldName); + targetMap = toMap(targetData, idFieldName); + } + // 处理新增数据 + + + for (String id : targetMap.keySet()) { + if (!sourceMap.containsKey(id)) { + reconciliationResult.get("新增").add(targetMap.get(id)); + } + } + + // 处理删除数据 + for (String id : sourceMap.keySet()) { + if (!targetMap.containsKey(id)) { + reconciliationResult.get("删除").add(sourceMap.get(id)); + } + } + + + // 处理修改数据 + for (String id : sourceMap.keySet()) { + if (targetMap.containsKey(id)) { + T sourceRecord = sourceMap.get(id); + T targetRecord = targetMap.get(id); + String result = compareRecords(sourceRecord, targetRecord); + if (!"All attributes match".equals(result)) { + reconciliationResult.get("修改").add(sourceMap.get(id)); + } + } + } + // 记录对账结果 + log.info("Reconciliation result: {}", reconciliationResult); + return reconciliationResult; + } + + + /** + * 将数据列表转换为 Map + * + * @param data 数据列表 + * @param idFieldName 标识字段的名称 + * @return 转换后的 Map + */ + private Map toMap(List data, String idFieldName) { + Map map = new HashMap<>(); + for (T record : data) { + try { + Field idField = record.getClass().getDeclaredField(idFieldName); + idField.setAccessible(true); + + Object object = idField.get(record); + if (ObjectUtils.isNotEmpty(object)) { + String id = object.toString(); + map.put(id, record); + } + } catch (NoSuchFieldException | IllegalAccessException e) { + log.error("Error converting data to map: {}", e.getMessage()); + } + } + return map; + } + + /** + * 比较两个数据记录 + * + * @param source 源数据记录 + * @param target 目标数据记录 + * @return 对账结果字符串 + */ + private String compareRecords(T source, T target) { + StringBuilder result = new StringBuilder(); + + Class clazz = source.getClass(); + Field[] fields = clazz.getDeclaredFields(); + + for (Field field : fields) { + field.setAccessible(true); + try { + Object sourceValue = field.get(source); + Object targetValue = field.get(target); + + if (!Objects.equals(sourceValue, targetValue)) { + result.append(field.getName()).append(" mismatch: ").append(sourceValue).append(" vs ").append(targetValue).append(", "); + } + } catch (IllegalAccessException e) { + log.error("Error accessing field: {}", e.getMessage()); + } + } + + if (result.length() > 0) { + result.setLength(result.length() - 2); // 去掉最后一个逗号和空格 + } else { + result.append("All attributes match"); + } + + return result.toString(); + } +} diff --git a/sync-framework/src/main/java/com/lyr/framework/aspectj/ParameterValidationAspect.java b/sync-framework/src/main/java/com/lyr/framework/aspectj/ParameterValidationAspect.java new file mode 100644 index 0000000..70430e2 --- /dev/null +++ b/sync-framework/src/main/java/com/lyr/framework/aspectj/ParameterValidationAspect.java @@ -0,0 +1,81 @@ +package com.lyr.framework.aspectj; + +import com.lyr.common.annotation.Params; +import com.lyr.common.utils.AssertUtil; +import lombok.extern.slf4j.Slf4j; +import org.aspectj.lang.JoinPoint; +import org.aspectj.lang.annotation.Aspect; +import org.aspectj.lang.annotation.Before; +import org.aspectj.lang.reflect.MethodSignature; +import org.springframework.stereotype.Component; + +import javax.annotation.Resource; +import javax.validation.Validator; +import java.lang.reflect.Method; +import java.lang.reflect.Parameter; + +@Aspect +@Component +@Slf4j +public class ParameterValidationAspect { + + @Before("@annotation(params)") + public void validateParameters(JoinPoint joinPoint, Params params) { + MethodSignature signature = (MethodSignature) joinPoint.getSignature(); + Method method = signature.getMethod(); + Object[] args = joinPoint.getArgs(); + + for (Params.Valid valid : params.value()) { + Parameter[] parameters = method.getParameters(); + for (int i = 0; i < parameters.length; i++) { + Parameter parameter = parameters[i]; + if (parameter.getName().equals(valid.value())) { + ParameterValidator parameterValidator = getValidator(parameter.getType()); + parameterValidator.validate(args[i], valid.message()); + } + } + } + } + + + public interface ParameterValidator { + void validate(Object arg, String message); + } + + @Resource + private Validator validator; + + private ParameterValidator getValidator(Class clazz) { + + if (String.class.isAssignableFrom(clazz)) { + log.debug("String 类型参数非空判断"); + return (arg, message) -> AssertUtil.notEmpty(String.valueOf(arg), message); + } else if (Number.class.isAssignableFrom(clazz)) { + log.debug("Number 类型参数非空判断"); + return (arg, message) -> { + + }; + } else if (Boolean.class.isAssignableFrom(clazz)) { + log.debug("Boolean 类型参数非空判断"); + return (arg, message) -> { + + }; + } else if (clazz.isEnum()) { + log.debug("Enum 类型参数非空判断"); + return (arg, message) -> { + + }; + } else if (clazz.isArray()) { + log.debug("数组类型参数非空判断"); + return (arg, message) -> { + + }; + } else { + // 默认的验证器,处理未列出的类型 + return (arg, message) -> { + log.warn("Unsupported parameter type: {}", clazz.getName()); + // 可以在这里添加通用的验证逻辑 + }; + } + } +} diff --git a/sync-framework/src/main/java/com/lyr/framework/config/RedisConfig.java b/sync-framework/src/main/java/com/lyr/framework/config/RedisConfig.java index 41e915c..8928403 100644 --- a/sync-framework/src/main/java/com/lyr/framework/config/RedisConfig.java +++ b/sync-framework/src/main/java/com/lyr/framework/config/RedisConfig.java @@ -10,6 +10,7 @@ import org.springframework.context.annotation.Configuration; import org.springframework.data.redis.connection.RedisConnectionFactory; import org.springframework.data.redis.core.RedisTemplate; import org.springframework.data.redis.core.script.DefaultRedisScript; +import org.springframework.data.redis.serializer.Jackson2JsonRedisSerializer; import org.springframework.data.redis.serializer.SerializationException; import org.springframework.data.redis.serializer.StringRedisSerializer; @@ -30,13 +31,14 @@ public class RedisConfig extends CachingConfigurerSupport { FastJson2JsonRedisSerializer serializer = new FastJson2JsonRedisSerializer(Object.class); // 使用StringRedisSerializer来序列化和反序列化redis的key值 - template.setKeySerializer(new StringRedisSerializer()); + template.setKeySerializer(new StringRedisSerializerEx()); template.setValueSerializer(serializer); // Hash的key也采用StringRedisSerializer的序列化方式 - template.setHashKeySerializer(new StringRedisSerializer()); + template.setHashKeySerializer(new StringRedisSerializerEx()); template.setHashValueSerializer(serializer); + template.afterPropertiesSet(); return template; } diff --git a/sync-gather/pom.xml b/sync-gather/pom.xml index dabe325..dc40732 100644 --- a/sync-gather/pom.xml +++ b/sync-gather/pom.xml @@ -28,10 +28,11 @@ sync-framework + - org.projectlombok - lombok - provided + com.lyr + sync-mq-starter + 1.0.0 diff --git a/sync-gather/src/main/java/com/lyr/gather/kis/domain/model/TSupplier.java b/sync-gather/src/main/java/com/lyr/gather/kis/domain/model/TSupplier.java index c35eaf6..b089830 100644 --- a/sync-gather/src/main/java/com/lyr/gather/kis/domain/model/TSupplier.java +++ b/sync-gather/src/main/java/com/lyr/gather/kis/domain/model/TSupplier.java @@ -100,7 +100,7 @@ public class TSupplier implements Serializable { private Integer fOtherApAcctId; private Integer fPreArAcctId; private String fHelpCode; - private Timestamp fModifyTime; + private byte[] fModifyTime; private Integer fCreditDegree; private String fRightUserId; private Integer fPaymentTime; diff --git a/sync-gather/src/main/java/com/lyr/gather/kis/domain/vo/TSupplierVO.java b/sync-gather/src/main/java/com/lyr/gather/kis/domain/vo/TSupplierVO.java index 0a5a421..ef292ee 100644 --- a/sync-gather/src/main/java/com/lyr/gather/kis/domain/vo/TSupplierVO.java +++ b/sync-gather/src/main/java/com/lyr/gather/kis/domain/vo/TSupplierVO.java @@ -103,7 +103,7 @@ public class TSupplierVO implements Serializable { private Integer fOtherApAcctId; private Integer fPreArAcctId; private String fHelpCode; - private Timestamp fModifyTime; + private byte[] fModifyTime; private Integer fCreditDegree; private String fRightUserId; private Integer fPaymentTime; diff --git a/sync-gather/src/main/java/com/lyr/gather/kis/mapper/TSupplierMapper.java b/sync-gather/src/main/java/com/lyr/gather/kis/mapper/TSupplierMapper.java index c623377..723706b 100644 --- a/sync-gather/src/main/java/com/lyr/gather/kis/mapper/TSupplierMapper.java +++ b/sync-gather/src/main/java/com/lyr/gather/kis/mapper/TSupplierMapper.java @@ -15,15 +15,6 @@ public interface TSupplierMapper extends BaseMapper { @DataSource(name = "#dataSource") - @Select("SELECT " + // - "s.FItemID, " + // - "s.FSaleMode, " + // - "s.FName, " + // - "s.FNumber, " + // - "s.FParentID, " + // - "s.FShortNumber, " + // - "s.FHelpCode, " + // - "s.FFullName " + // - "FROM t_Supplier s") + @Select("SELECT * FROM t_Supplier s") IPage page(@Param("page") Page page, @Param("dataSource") String dataSource); } diff --git a/sync-gather/src/main/java/com/lyr/gather/local/service/IConfigTaskService.java b/sync-gather/src/main/java/com/lyr/gather/local/service/IConfigTaskService.java index e932f60..d0c3533 100644 --- a/sync-gather/src/main/java/com/lyr/gather/local/service/IConfigTaskService.java +++ b/sync-gather/src/main/java/com/lyr/gather/local/service/IConfigTaskService.java @@ -2,6 +2,7 @@ package com.lyr.gather.local.service; import com.baomidou.mybatisplus.core.metadata.IPage; import com.github.yulichang.base.MPJBaseService; +import com.lyr.common.core.domain.ServiceResult; import com.lyr.gather.local.domain.model.ConfigTask; import com.lyr.gather.local.domain.req.ConfigTaskEditReq; import com.lyr.gather.local.domain.req.ConfigTaskReq; @@ -26,5 +27,5 @@ public interface IConfigTaskService extends MPJBaseService { int remove(Long[] ids); - int sync(String dataSource, String tableName); + ServiceResult sync(String dataSource, String tableName); } diff --git a/sync-gather/src/main/java/com/lyr/gather/local/service/impl/ConfigTaskServiceImpl.java b/sync-gather/src/main/java/com/lyr/gather/local/service/impl/ConfigTaskServiceImpl.java index c882669..848ddca 100644 --- a/sync-gather/src/main/java/com/lyr/gather/local/service/impl/ConfigTaskServiceImpl.java +++ b/sync-gather/src/main/java/com/lyr/gather/local/service/impl/ConfigTaskServiceImpl.java @@ -1,11 +1,14 @@ package com.lyr.gather.local.service.impl; +import com.alibaba.fastjson2.JSON; import com.baomidou.mybatisplus.core.metadata.IPage; import com.baomidou.mybatisplus.extension.service.IService; -import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl; import com.github.yulichang.base.MPJBaseServiceImpl; import com.google.common.collect.ImmutableMap; +import com.lyr.common.annotation.Params; +import com.lyr.common.core.domain.ServiceResult; import com.lyr.common.core.redis.RedisCache; +import com.lyr.common.utils.bean.DataReconciliationService; import com.lyr.gather.kis.domain.model.TSupplier; import com.lyr.gather.kis.domain.strategy.SyncServicesStrategyContext; import com.lyr.gather.local.convert.ConfigTaskConvert; @@ -16,17 +19,24 @@ import com.lyr.gather.local.domain.vo.ConfigTaskInfoVO; import com.lyr.gather.local.domain.vo.ConfigTaskPageVO; import com.lyr.gather.local.mapper.ConfigTaskMapper; import com.lyr.gather.local.service.IConfigTaskService; +import com.lyr.mq.producer.RTopicPublish; +import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.StringUtils; import org.springframework.stereotype.Service; +import org.springframework.util.CollectionUtils; import javax.annotation.Resource; import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; /** * @author liyc * @date 2024/11/5 * @description TODO **/ +@Slf4j @Service public class ConfigTaskServiceImpl extends MPJBaseServiceImpl implements IConfigTaskService { @@ -35,6 +45,8 @@ public class ConfigTaskServiceImpl extends MPJBaseServiceImpl page(ConfigTaskReq req) { @@ -70,12 +82,37 @@ public class ConfigTaskServiceImpl extends MPJBaseServiceImpl cacheSet = redisCache.getCacheSet(tableName, TSupplier.class); IService handler = syncServicesStrategyContext.getHandler(tableName); - List tSuppliers = handler.listByMap(ImmutableMap.of(handler.getClass().getName(), dataSource)); - long cacheList = redisCache.setCacheList(tableName, tSuppliers); - System.out.println("cacheList = " + cacheList); - return 0; + List target = handler.listByMap(ImmutableMap.of(handler.getClass().getName(), dataSource)); + if (CollectionUtils.isEmpty(cacheSet)) { + redisCache.setCacheSet(tableName, target.stream().collect(Collectors.toSet())); + } else { + List sourceCache = cacheSet.stream().collect(Collectors.toList()); + DataReconciliationService dataReconciliationService = new DataReconciliationService(); + Map> reconcileData = dataReconciliationService.reconcileData(sourceCache, target); + reconcileData.forEach((k, v) -> { + switch (k) { + case "新增": + rTopicPublish.publishMessage(JSON.toJSONString(v)); + break; + case "删除": + + break; + case "更新": + + break; + default: + break; + } + }); + } + + return ServiceResult.success(""); } diff --git a/sync-gather/target/generated-sources/annotations/com/lyr/gather/kis/convert/TSupplierConvertImpl.java b/sync-gather/target/generated-sources/annotations/com/lyr/gather/kis/convert/TSupplierConvertImpl.java index 47efa1c..3aac120 100644 --- a/sync-gather/target/generated-sources/annotations/com/lyr/gather/kis/convert/TSupplierConvertImpl.java +++ b/sync-gather/target/generated-sources/annotations/com/lyr/gather/kis/convert/TSupplierConvertImpl.java @@ -2,11 +2,12 @@ package com.lyr.gather.kis.convert; import com.lyr.gather.kis.domain.model.TSupplier; import com.lyr.gather.kis.domain.vo.TSupplierVO; +import java.util.Arrays; import javax.annotation.Generated; @Generated( value = "org.mapstruct.ap.MappingProcessor", - date = "2024-11-11T11:01:13+0800", + date = "2024-11-11T18:33:53+0800", comments = "version: 1.6.2, compiler: javac, environment: Java 1.8.0_392 (Amazon.com Inc.)" ) public class TSupplierConvertImpl implements TSupplierConvert { @@ -106,7 +107,10 @@ public class TSupplierConvertImpl implements TSupplierConvert { tSupplierVO.setFOtherApAcctId( bean.getFOtherApAcctId() ); tSupplierVO.setFPreArAcctId( bean.getFPreArAcctId() ); tSupplierVO.setFHelpCode( bean.getFHelpCode() ); - tSupplierVO.setFModifyTime( bean.getFModifyTime() ); + byte[] fModifyTime = bean.getFModifyTime(); + if ( fModifyTime != null ) { + tSupplierVO.setFModifyTime( Arrays.copyOf( fModifyTime, fModifyTime.length ) ); + } tSupplierVO.setFCreditDegree( bean.getFCreditDegree() ); tSupplierVO.setFRightUserId( bean.getFRightUserId() ); tSupplierVO.setFPaymentTime( bean.getFPaymentTime() ); @@ -211,7 +215,10 @@ public class TSupplierConvertImpl implements TSupplierConvert { tSupplier.setFOtherApAcctId( vo.getFOtherApAcctId() ); tSupplier.setFPreArAcctId( vo.getFPreArAcctId() ); tSupplier.setFHelpCode( vo.getFHelpCode() ); - tSupplier.setFModifyTime( vo.getFModifyTime() ); + byte[] fModifyTime = vo.getFModifyTime(); + if ( fModifyTime != null ) { + tSupplier.setFModifyTime( Arrays.copyOf( fModifyTime, fModifyTime.length ) ); + } tSupplier.setFCreditDegree( vo.getFCreditDegree() ); tSupplier.setFRightUserId( vo.getFRightUserId() ); tSupplier.setFPaymentTime( vo.getFPaymentTime() ); diff --git a/sync-gather/target/generated-sources/annotations/com/lyr/gather/local/convert/ConfigTaskConvertImpl.java b/sync-gather/target/generated-sources/annotations/com/lyr/gather/local/convert/ConfigTaskConvertImpl.java index bc83010..512f109 100644 --- a/sync-gather/target/generated-sources/annotations/com/lyr/gather/local/convert/ConfigTaskConvertImpl.java +++ b/sync-gather/target/generated-sources/annotations/com/lyr/gather/local/convert/ConfigTaskConvertImpl.java @@ -6,7 +6,7 @@ import javax.annotation.Generated; @Generated( value = "org.mapstruct.ap.MappingProcessor", - date = "2024-11-11T11:01:13+0800", + date = "2024-11-11T18:33:54+0800", comments = "version: 1.6.2, compiler: javac, environment: Java 1.8.0_392 (Amazon.com Inc.)" ) public class ConfigTaskConvertImpl implements ConfigTaskConvert { diff --git a/sync-gather/target/generated-sources/annotations/com/lyr/gather/local/convert/DataSourceConfigConvertImpl.java b/sync-gather/target/generated-sources/annotations/com/lyr/gather/local/convert/DataSourceConfigConvertImpl.java index 27c72b5..02094fd 100644 --- a/sync-gather/target/generated-sources/annotations/com/lyr/gather/local/convert/DataSourceConfigConvertImpl.java +++ b/sync-gather/target/generated-sources/annotations/com/lyr/gather/local/convert/DataSourceConfigConvertImpl.java @@ -8,7 +8,7 @@ import javax.annotation.Generated; @Generated( value = "org.mapstruct.ap.MappingProcessor", - date = "2024-11-11T11:01:13+0800", + date = "2024-11-11T18:33:53+0800", comments = "version: 1.6.2, compiler: javac, environment: Java 1.8.0_392 (Amazon.com Inc.)" ) public class DataSourceConfigConvertImpl implements DataSourceConfigConvert { diff --git a/sync-mq-starter/pom.xml b/sync-mq-starter/pom.xml new file mode 100644 index 0000000..865e7ec --- /dev/null +++ b/sync-mq-starter/pom.xml @@ -0,0 +1,59 @@ + + + + com.lyr + sync + 1.0.0 + + 4.0.0 + sync-mq-starter + + + 消息模块 + + + + + + org.springframework.boot + spring-boot-starter + + + org.redisson + redisson-spring-boot-starter + + + + org.springframework.boot + spring-boot-configuration-processor + true + + + + org.springframework.boot + spring-boot-autoconfigure + + + org.projectlombok + lombok + compile + + + + + + + org.apache.maven.plugins + maven-compiler-plugin + 3.8.1 + + 1.8 + 1.8 + + + + + + \ No newline at end of file diff --git a/sync-mq-starter/src/main/java/com/lyr/mq/RssMqAutoConfiguration.java b/sync-mq-starter/src/main/java/com/lyr/mq/RssMqAutoConfiguration.java new file mode 100644 index 0000000..a4976e7 --- /dev/null +++ b/sync-mq-starter/src/main/java/com/lyr/mq/RssMqAutoConfiguration.java @@ -0,0 +1,86 @@ +package com.lyr.mq; + +import com.lyr.mq.config.RssMqProperties; +import com.lyr.mq.consumer.RTopicSubscribe; +import com.lyr.mq.producer.RTopicPublish; +import com.lyr.mq.service.RTopicSubscriberService; +import io.micrometer.core.instrument.util.StringUtils; +import lombok.extern.slf4j.Slf4j; +import org.redisson.Redisson; +import org.redisson.api.RLock; +import org.redisson.api.RedissonClient; +import org.redisson.config.Config; +import org.redisson.config.SingleServerConfig; +import org.redisson.config.TransportMode; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.context.properties.EnableConfigurationProperties; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; + +/** + * @author liyc + * @date 2024/11/11 + * @description TODO + **/ + +@Configuration +@EnableConfigurationProperties(RssMqProperties.class) +@Slf4j +public class RssMqAutoConfiguration { + + + private RssMqProperties properties; + private RTopicSubscriberService rTopicSubscriberService; + private RedissonClient redissonClient; + + + @Autowired + public RssMqAutoConfiguration(RssMqProperties rssMqProperties, RTopicSubscriberService rTopicSubscriberService) { + this.properties = rssMqProperties; + this.rTopicSubscriberService = rTopicSubscriberService; + initRedissonClient(); + } + + + private void initRedissonClient() { + Config config = new Config(); + config.setTransportMode(TransportMode.NIO); + SingleServerConfig singleServerConfig = config.useSingleServer(); + String address = String.format("redis://%s:%s", properties.getHost(), properties.getPort()); + singleServerConfig.setAddress(address); + if (StringUtils.isNotEmpty(properties.getPassword())) { + singleServerConfig.setPassword(properties.getPassword()); + } + RedissonClient redisson = Redisson.create(config); + // 测试连接 + RLock testLock = redisson.getLock("test_lock"); + boolean b = testLock.tryLock(); + log.info("redis连接测试结果:{}", b); + redisson.getLock("test_lock").unlock(); + this.redissonClient = redisson; + } + + @Bean + public RTopicPublish rTopicPublish() { + RTopicPublish publish = new RTopicPublish(); + publish.setRedissonClient(redissonClient); + publish.setProperties(properties); + publish.setSubscriberService(rTopicSubscriberService); + return publish; + } + + @Bean + public RTopicSubscribe rTopicSubscribe() { + RTopicSubscribe subscribe = new RTopicSubscribe(); + subscribe.setRedissonClient(redissonClient); + subscribe.setProperties(properties); + subscribe.setSubscriberService(rTopicSubscriberService); + return subscribe; + } + + @Bean(name = "rTopicSubscriberService") + public RTopicSubscriberService subscriberService() { + return new RTopicSubscriberService(); + } + +} diff --git a/sync-mq-starter/src/main/java/com/lyr/mq/config/RssMqProperties.java b/sync-mq-starter/src/main/java/com/lyr/mq/config/RssMqProperties.java new file mode 100644 index 0000000..f2ba638 --- /dev/null +++ b/sync-mq-starter/src/main/java/com/lyr/mq/config/RssMqProperties.java @@ -0,0 +1,26 @@ +package com.lyr.mq.config; + +import lombok.Data; +import org.springframework.boot.context.properties.ConfigurationProperties; +import org.springframework.stereotype.Component; + +/** + * @author liyc + * @date 2024/11/11 + * @description TODO + **/ +@Data +@Component +@ConfigurationProperties(prefix = "rss.mq") +public class RssMqProperties { + + private String topicName; + private String confirmTopicName; + private String subscriberChannelName; + private String host; + private Integer port; + private Integer database; + private String password; + + +} diff --git a/sync-mq-starter/src/main/java/com/lyr/mq/consumer/RTopicSubscribe.java b/sync-mq-starter/src/main/java/com/lyr/mq/consumer/RTopicSubscribe.java new file mode 100644 index 0000000..bae2284 --- /dev/null +++ b/sync-mq-starter/src/main/java/com/lyr/mq/consumer/RTopicSubscribe.java @@ -0,0 +1,42 @@ +package com.lyr.mq.consumer; + +import com.lyr.mq.config.RssMqProperties; +import com.lyr.mq.service.RTopicSubscriberService; +import lombok.extern.slf4j.Slf4j; +import org.redisson.api.RTopic; +import org.redisson.api.RedissonClient; +import org.redisson.api.listener.MessageListener; + +/** + * @author liyc + * @date 2024/11/11 + * @description TODO + **/ +@Slf4j +public class RTopicSubscribe { + + + private RedissonClient redissonClient; + private RssMqProperties properties; + private RTopicSubscriberService RTopicSubscriberService; + + public void setRedissonClient(RedissonClient redissonClient) { + this.redissonClient = redissonClient; + } + + public void setProperties(RssMqProperties properties) { + this.properties = properties; + } + + public void setSubscriberService(RTopicSubscriberService RTopicSubscriberService) { + this.RTopicSubscriberService = RTopicSubscriberService; + } + + public void startListening() { + RTopic topic = redissonClient.getTopic(properties.getTopicName()); + topic.addListener(String.class, (MessageListener) (channel, msg) -> { + RTopicSubscriberService.handleMessage(msg); + }); + } + +} diff --git a/sync-mq-starter/src/main/java/com/lyr/mq/producer/RTopicPublish.java b/sync-mq-starter/src/main/java/com/lyr/mq/producer/RTopicPublish.java new file mode 100644 index 0000000..74566dc --- /dev/null +++ b/sync-mq-starter/src/main/java/com/lyr/mq/producer/RTopicPublish.java @@ -0,0 +1,41 @@ +package com.lyr.mq.producer; + +import com.lyr.mq.config.RssMqProperties; +import com.lyr.mq.service.RTopicSubscriberService; +import lombok.extern.slf4j.Slf4j; +import org.redisson.api.RTopic; +import org.redisson.api.RedissonClient; +import org.springframework.context.annotation.Primary; +import org.springframework.stereotype.Component; + +/** + * @author liyc + * @date 2024/11/11 + * @description TODO + **/ +@Slf4j +@Component +@Primary +public class RTopicPublish { + private RedissonClient redissonClient; + private RssMqProperties properties; + private RTopicSubscriberService rTopicSubscriberService; + + public void setRedissonClient(RedissonClient redissonClient) { + this.redissonClient = redissonClient; + } + + public void setProperties(RssMqProperties properties) { + this.properties = properties; + } + + public void setSubscriberService(RTopicSubscriberService rTopicSubscriberService) { + this.rTopicSubscriberService = rTopicSubscriberService; + } + + public void publishMessage(String message) { + RTopic topic = redissonClient.getTopic(properties.getTopicName()); + topic.publish(message); + } + +} diff --git a/sync-mq-starter/src/main/java/com/lyr/mq/service/RTopicSubscriberService.java b/sync-mq-starter/src/main/java/com/lyr/mq/service/RTopicSubscriberService.java new file mode 100644 index 0000000..c2aeb25 --- /dev/null +++ b/sync-mq-starter/src/main/java/com/lyr/mq/service/RTopicSubscriberService.java @@ -0,0 +1,16 @@ +package com.lyr.mq.service; + +import org.springframework.stereotype.Component; + +/** + * @author liyc + * @date 2024/11/11 + * @description TODO + **/ +@Component +public class RTopicSubscriberService { + public void handleMessage(String message) { + System.out.println("Received message: " + message); + // 处理消息的逻辑 + } +} diff --git a/sync-mq-starter/src/main/resources/META-INF/spring.factories b/sync-mq-starter/src/main/resources/META-INF/spring.factories new file mode 100644 index 0000000..d083d30 --- /dev/null +++ b/sync-mq-starter/src/main/resources/META-INF/spring.factories @@ -0,0 +1 @@ +org.springframework.boot.autoconfigure.EnableAutoConfiguration=com.lyr.mq.RssMqAutoConfiguration \ No newline at end of file