feat(sync-gather): 实现数据同步功能

- 新增数据同步方法,支持增量同步
-增加数据校验和处理逻辑
- 集成消息队列服务,支持异步消息处理
- 优化 Redis 缓存操作
- 添加参数验证注解和切面
dev
1iyc 2 weeks ago
parent 6bfef02b9b
commit edbb311f00

@ -234,6 +234,7 @@
<module>sync-generator</module>
<module>sync-common</module>
<module>sync-gather</module>
<module>sync-mq</module>
</modules>
<packaging>pom</packaging>

@ -0,0 +1,25 @@
package com.lyr.common.annotation;
import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
public @interface Params {
Valid[] value();
@Target(ElementType.ANNOTATION_TYPE)
@Retention(RetentionPolicy.RUNTIME)
@interface Valid {
String value();
String message();
}
}

@ -1,5 +1,6 @@
package com.lyr.common.core.redis;
import com.alibaba.fastjson2.JSON;
import com.alibaba.fastjson2.JSONObject;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.BoundSetOperations;
@ -10,6 +11,7 @@ import org.springframework.stereotype.Component;
import java.util.*;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
/**
* spring redis
@ -177,7 +179,12 @@ public class RedisCache {
* @param key
* @return
*/
public <T> Set<T> getCacheSet(final String key) {
public <T> Set<T> getCacheSet(final String key, Class<T>... clazz) {
Set members = redisTemplate.opsForSet().members(key);
if (clazz.length > 0) {
Set<T> collect = (Set<T>) members.stream().map(o -> JSON.parseObject(JSON.toJSONString(o), clazz[0])).collect(Collectors.toSet());
return collect;
}
return redisTemplate.opsForSet().members(key);
}

@ -0,0 +1,139 @@
package com.lyr.common.utils.bean;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.ObjectUtils;
import java.lang.reflect.Field;
import java.util.*;
@Slf4j
public class DataReconciliationService<T> {
// 存储对账结果
private Map<String, List<T>> reconciliationResult = new HashMap<>();
public DataReconciliationService() {
reconciliationResult.put("新增", new ArrayList<>());
reconciliationResult.put("删除", new ArrayList<>());
reconciliationResult.put("修改", new ArrayList<>());
}
/**
*
*
* @param sourceData
* @param targetData
* @return
*/
public Map<String, List<T>> reconcileData(List<T> sourceData, List<T> targetData) {
if (sourceData == null || targetData == null) {
throw new IllegalArgumentException("Source and target data cannot be null");
}
if (sourceData.isEmpty() || targetData.isEmpty()) {
return reconciliationResult;
}
Map<String, T> sourceMap = null;
Map<String, T> targetMap = null;
for (Field field : sourceData.get(0).getClass().getDeclaredFields()) {
String idFieldName = field.getName();
sourceMap = toMap(sourceData, idFieldName);
targetMap = toMap(targetData, idFieldName);
}
// 处理新增数据
for (String id : targetMap.keySet()) {
if (!sourceMap.containsKey(id)) {
reconciliationResult.get("新增").add(targetMap.get(id));
}
}
// 处理删除数据
for (String id : sourceMap.keySet()) {
if (!targetMap.containsKey(id)) {
reconciliationResult.get("删除").add(sourceMap.get(id));
}
}
// 处理修改数据
for (String id : sourceMap.keySet()) {
if (targetMap.containsKey(id)) {
T sourceRecord = sourceMap.get(id);
T targetRecord = targetMap.get(id);
String result = compareRecords(sourceRecord, targetRecord);
if (!"All attributes match".equals(result)) {
reconciliationResult.get("修改").add(sourceMap.get(id));
}
}
}
// 记录对账结果
log.info("Reconciliation result: {}", reconciliationResult);
return reconciliationResult;
}
/**
* Map
*
* @param data
* @param idFieldName
* @return Map
*/
private Map<String, T> toMap(List<T> data, String idFieldName) {
Map<String, T> map = new HashMap<>();
for (T record : data) {
try {
Field idField = record.getClass().getDeclaredField(idFieldName);
idField.setAccessible(true);
Object object = idField.get(record);
if (ObjectUtils.isNotEmpty(object)) {
String id = object.toString();
map.put(id, record);
}
} catch (NoSuchFieldException | IllegalAccessException e) {
log.error("Error converting data to map: {}", e.getMessage());
}
}
return map;
}
/**
*
*
* @param source
* @param target
* @return
*/
private String compareRecords(T source, T target) {
StringBuilder result = new StringBuilder();
Class<?> clazz = source.getClass();
Field[] fields = clazz.getDeclaredFields();
for (Field field : fields) {
field.setAccessible(true);
try {
Object sourceValue = field.get(source);
Object targetValue = field.get(target);
if (!Objects.equals(sourceValue, targetValue)) {
result.append(field.getName()).append(" mismatch: ").append(sourceValue).append(" vs ").append(targetValue).append(", ");
}
} catch (IllegalAccessException e) {
log.error("Error accessing field: {}", e.getMessage());
}
}
if (result.length() > 0) {
result.setLength(result.length() - 2); // 去掉最后一个逗号和空格
} else {
result.append("All attributes match");
}
return result.toString();
}
}

@ -0,0 +1,81 @@
package com.lyr.framework.aspectj;
import com.lyr.common.annotation.Params;
import com.lyr.common.utils.AssertUtil;
import lombok.extern.slf4j.Slf4j;
import org.aspectj.lang.JoinPoint;
import org.aspectj.lang.annotation.Aspect;
import org.aspectj.lang.annotation.Before;
import org.aspectj.lang.reflect.MethodSignature;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
import javax.validation.Validator;
import java.lang.reflect.Method;
import java.lang.reflect.Parameter;
@Aspect
@Component
@Slf4j
public class ParameterValidationAspect {
@Before("@annotation(params)")
public void validateParameters(JoinPoint joinPoint, Params params) {
MethodSignature signature = (MethodSignature) joinPoint.getSignature();
Method method = signature.getMethod();
Object[] args = joinPoint.getArgs();
for (Params.Valid valid : params.value()) {
Parameter[] parameters = method.getParameters();
for (int i = 0; i < parameters.length; i++) {
Parameter parameter = parameters[i];
if (parameter.getName().equals(valid.value())) {
ParameterValidator parameterValidator = getValidator(parameter.getType());
parameterValidator.validate(args[i], valid.message());
}
}
}
}
public interface ParameterValidator {
void validate(Object arg, String message);
}
@Resource
private Validator validator;
private ParameterValidator getValidator(Class<?> clazz) {
if (String.class.isAssignableFrom(clazz)) {
log.debug("String 类型参数非空判断");
return (arg, message) -> AssertUtil.notEmpty(String.valueOf(arg), message);
} else if (Number.class.isAssignableFrom(clazz)) {
log.debug("Number 类型参数非空判断");
return (arg, message) -> {
};
} else if (Boolean.class.isAssignableFrom(clazz)) {
log.debug("Boolean 类型参数非空判断");
return (arg, message) -> {
};
} else if (clazz.isEnum()) {
log.debug("Enum 类型参数非空判断");
return (arg, message) -> {
};
} else if (clazz.isArray()) {
log.debug("数组类型参数非空判断");
return (arg, message) -> {
};
} else {
// 默认的验证器,处理未列出的类型
return (arg, message) -> {
log.warn("Unsupported parameter type: {}", clazz.getName());
// 可以在这里添加通用的验证逻辑
};
}
}
}

@ -10,6 +10,7 @@ import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.core.script.DefaultRedisScript;
import org.springframework.data.redis.serializer.Jackson2JsonRedisSerializer;
import org.springframework.data.redis.serializer.SerializationException;
import org.springframework.data.redis.serializer.StringRedisSerializer;
@ -30,13 +31,14 @@ public class RedisConfig extends CachingConfigurerSupport {
FastJson2JsonRedisSerializer serializer = new FastJson2JsonRedisSerializer(Object.class);
// 使用StringRedisSerializer来序列化和反序列化redis的key值
template.setKeySerializer(new StringRedisSerializer());
template.setKeySerializer(new StringRedisSerializerEx());
template.setValueSerializer(serializer);
// Hash的key也采用StringRedisSerializer的序列化方式
template.setHashKeySerializer(new StringRedisSerializer());
template.setHashKeySerializer(new StringRedisSerializerEx());
template.setHashValueSerializer(serializer);
template.afterPropertiesSet();
return template;
}

@ -28,10 +28,11 @@
<artifactId>sync-framework</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<scope>provided</scope>
<groupId>com.lyr</groupId>
<artifactId>sync-mq-starter</artifactId>
<version>1.0.0</version>
</dependency>

@ -100,7 +100,7 @@ public class TSupplier implements Serializable {
private Integer fOtherApAcctId;
private Integer fPreArAcctId;
private String fHelpCode;
private Timestamp fModifyTime;
private byte[] fModifyTime;
private Integer fCreditDegree;
private String fRightUserId;
private Integer fPaymentTime;

@ -103,7 +103,7 @@ public class TSupplierVO implements Serializable {
private Integer fOtherApAcctId;
private Integer fPreArAcctId;
private String fHelpCode;
private Timestamp fModifyTime;
private byte[] fModifyTime;
private Integer fCreditDegree;
private String fRightUserId;
private Integer fPaymentTime;

@ -15,15 +15,6 @@ public interface TSupplierMapper extends BaseMapper<TSupplier> {
@DataSource(name = "#dataSource")
@Select("SELECT " + //
"s.FItemID, " + //
"s.FSaleMode, " + //
"s.FName, " + //
"s.FNumber, " + //
"s.FParentID, " + //
"s.FShortNumber, " + //
"s.FHelpCode, " + //
"s.FFullName " + //
"FROM t_Supplier s")
@Select("SELECT * FROM t_Supplier s")
IPage<TSupplier> page(@Param("page") Page page, @Param("dataSource") String dataSource);
}

@ -2,6 +2,7 @@ package com.lyr.gather.local.service;
import com.baomidou.mybatisplus.core.metadata.IPage;
import com.github.yulichang.base.MPJBaseService;
import com.lyr.common.core.domain.ServiceResult;
import com.lyr.gather.local.domain.model.ConfigTask;
import com.lyr.gather.local.domain.req.ConfigTaskEditReq;
import com.lyr.gather.local.domain.req.ConfigTaskReq;
@ -26,5 +27,5 @@ public interface IConfigTaskService extends MPJBaseService<ConfigTask> {
int remove(Long[] ids);
int sync(String dataSource, String tableName);
ServiceResult sync(String dataSource, String tableName);
}

@ -1,11 +1,14 @@
package com.lyr.gather.local.service.impl;
import com.alibaba.fastjson2.JSON;
import com.baomidou.mybatisplus.core.metadata.IPage;
import com.baomidou.mybatisplus.extension.service.IService;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import com.github.yulichang.base.MPJBaseServiceImpl;
import com.google.common.collect.ImmutableMap;
import com.lyr.common.annotation.Params;
import com.lyr.common.core.domain.ServiceResult;
import com.lyr.common.core.redis.RedisCache;
import com.lyr.common.utils.bean.DataReconciliationService;
import com.lyr.gather.kis.domain.model.TSupplier;
import com.lyr.gather.kis.domain.strategy.SyncServicesStrategyContext;
import com.lyr.gather.local.convert.ConfigTaskConvert;
@ -16,17 +19,24 @@ import com.lyr.gather.local.domain.vo.ConfigTaskInfoVO;
import com.lyr.gather.local.domain.vo.ConfigTaskPageVO;
import com.lyr.gather.local.mapper.ConfigTaskMapper;
import com.lyr.gather.local.service.IConfigTaskService;
import com.lyr.mq.producer.RTopicPublish;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.stereotype.Service;
import org.springframework.util.CollectionUtils;
import javax.annotation.Resource;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
/**
* @author liyc
* @date 2024/11/5
* @description TODO
**/
@Slf4j
@Service
public class ConfigTaskServiceImpl extends MPJBaseServiceImpl<ConfigTaskMapper, ConfigTask> implements IConfigTaskService {
@ -35,6 +45,8 @@ public class ConfigTaskServiceImpl extends MPJBaseServiceImpl<ConfigTaskMapper,
@Resource
private RedisCache redisCache;
@Resource
private RTopicPublish rTopicPublish;
@Override
public IPage<ConfigTaskPageVO> page(ConfigTaskReq req) {
@ -70,12 +82,37 @@ public class ConfigTaskServiceImpl extends MPJBaseServiceImpl<ConfigTaskMapper,
}
@Override
public int sync(String dataSource, String tableName) {
@Params({@Params.Valid(value = "dataSource", message = "数据源名称不能为空"), //
@Params.Valid(value = "tableName", message = "数据源名称不能为空") //
})
public ServiceResult sync(String dataSource, String tableName) {
Set<TSupplier> cacheSet = redisCache.getCacheSet(tableName, TSupplier.class);
IService<TSupplier> handler = syncServicesStrategyContext.getHandler(tableName);
List<TSupplier> tSuppliers = handler.listByMap(ImmutableMap.of(handler.getClass().getName(), dataSource));
long cacheList = redisCache.setCacheList(tableName, tSuppliers);
System.out.println("cacheList = " + cacheList);
return 0;
List<TSupplier> target = handler.listByMap(ImmutableMap.of(handler.getClass().getName(), dataSource));
if (CollectionUtils.isEmpty(cacheSet)) {
redisCache.setCacheSet(tableName, target.stream().collect(Collectors.toSet()));
} else {
List<TSupplier> sourceCache = cacheSet.stream().collect(Collectors.toList());
DataReconciliationService dataReconciliationService = new DataReconciliationService<TSupplier>();
Map<String, List<TSupplier>> reconcileData = dataReconciliationService.reconcileData(sourceCache, target);
reconcileData.forEach((k, v) -> {
switch (k) {
case "新增":
rTopicPublish.publishMessage(JSON.toJSONString(v));
break;
case "删除":
break;
case "更新":
break;
default:
break;
}
});
}
return ServiceResult.success("");
}

@ -2,11 +2,12 @@ package com.lyr.gather.kis.convert;
import com.lyr.gather.kis.domain.model.TSupplier;
import com.lyr.gather.kis.domain.vo.TSupplierVO;
import java.util.Arrays;
import javax.annotation.Generated;
@Generated(
value = "org.mapstruct.ap.MappingProcessor",
date = "2024-11-11T11:01:13+0800",
date = "2024-11-11T18:33:53+0800",
comments = "version: 1.6.2, compiler: javac, environment: Java 1.8.0_392 (Amazon.com Inc.)"
)
public class TSupplierConvertImpl implements TSupplierConvert {
@ -106,7 +107,10 @@ public class TSupplierConvertImpl implements TSupplierConvert {
tSupplierVO.setFOtherApAcctId( bean.getFOtherApAcctId() );
tSupplierVO.setFPreArAcctId( bean.getFPreArAcctId() );
tSupplierVO.setFHelpCode( bean.getFHelpCode() );
tSupplierVO.setFModifyTime( bean.getFModifyTime() );
byte[] fModifyTime = bean.getFModifyTime();
if ( fModifyTime != null ) {
tSupplierVO.setFModifyTime( Arrays.copyOf( fModifyTime, fModifyTime.length ) );
}
tSupplierVO.setFCreditDegree( bean.getFCreditDegree() );
tSupplierVO.setFRightUserId( bean.getFRightUserId() );
tSupplierVO.setFPaymentTime( bean.getFPaymentTime() );
@ -211,7 +215,10 @@ public class TSupplierConvertImpl implements TSupplierConvert {
tSupplier.setFOtherApAcctId( vo.getFOtherApAcctId() );
tSupplier.setFPreArAcctId( vo.getFPreArAcctId() );
tSupplier.setFHelpCode( vo.getFHelpCode() );
tSupplier.setFModifyTime( vo.getFModifyTime() );
byte[] fModifyTime = vo.getFModifyTime();
if ( fModifyTime != null ) {
tSupplier.setFModifyTime( Arrays.copyOf( fModifyTime, fModifyTime.length ) );
}
tSupplier.setFCreditDegree( vo.getFCreditDegree() );
tSupplier.setFRightUserId( vo.getFRightUserId() );
tSupplier.setFPaymentTime( vo.getFPaymentTime() );

@ -6,7 +6,7 @@ import javax.annotation.Generated;
@Generated(
value = "org.mapstruct.ap.MappingProcessor",
date = "2024-11-11T11:01:13+0800",
date = "2024-11-11T18:33:54+0800",
comments = "version: 1.6.2, compiler: javac, environment: Java 1.8.0_392 (Amazon.com Inc.)"
)
public class ConfigTaskConvertImpl implements ConfigTaskConvert {

@ -8,7 +8,7 @@ import javax.annotation.Generated;
@Generated(
value = "org.mapstruct.ap.MappingProcessor",
date = "2024-11-11T11:01:13+0800",
date = "2024-11-11T18:33:53+0800",
comments = "version: 1.6.2, compiler: javac, environment: Java 1.8.0_392 (Amazon.com Inc.)"
)
public class DataSourceConfigConvertImpl implements DataSourceConfigConvert {

@ -0,0 +1,59 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns="http://maven.apache.org/POM/4.0.0"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<groupId>com.lyr</groupId>
<artifactId>sync</artifactId>
<version>1.0.0</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>sync-mq-starter</artifactId>
<description>
消息模块
</description>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>org.redisson</groupId>
<artifactId>redisson-spring-boot-starter</artifactId>
</dependency>
<!-- 在编译时会自动收集配置类的条件写到一个META-INF/spring-autoconfigure-metadata.json中-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-configuration-processor</artifactId>
<optional>true</optional>
</dependency>
<!-- 提供了自动装配功能-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-autoconfigure</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<scope>compile</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.8.1</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
</plugins>
</build>
</project>

@ -0,0 +1,86 @@
package com.lyr.mq;
import com.lyr.mq.config.RssMqProperties;
import com.lyr.mq.consumer.RTopicSubscribe;
import com.lyr.mq.producer.RTopicPublish;
import com.lyr.mq.service.RTopicSubscriberService;
import io.micrometer.core.instrument.util.StringUtils;
import lombok.extern.slf4j.Slf4j;
import org.redisson.Redisson;
import org.redisson.api.RLock;
import org.redisson.api.RedissonClient;
import org.redisson.config.Config;
import org.redisson.config.SingleServerConfig;
import org.redisson.config.TransportMode;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* @author liyc
* @date 2024/11/11
* @description TODO
**/
@Configuration
@EnableConfigurationProperties(RssMqProperties.class)
@Slf4j
public class RssMqAutoConfiguration {
private RssMqProperties properties;
private RTopicSubscriberService rTopicSubscriberService;
private RedissonClient redissonClient;
@Autowired
public RssMqAutoConfiguration(RssMqProperties rssMqProperties, RTopicSubscriberService rTopicSubscriberService) {
this.properties = rssMqProperties;
this.rTopicSubscriberService = rTopicSubscriberService;
initRedissonClient();
}
private void initRedissonClient() {
Config config = new Config();
config.setTransportMode(TransportMode.NIO);
SingleServerConfig singleServerConfig = config.useSingleServer();
String address = String.format("redis://%s:%s", properties.getHost(), properties.getPort());
singleServerConfig.setAddress(address);
if (StringUtils.isNotEmpty(properties.getPassword())) {
singleServerConfig.setPassword(properties.getPassword());
}
RedissonClient redisson = Redisson.create(config);
// 测试连接
RLock testLock = redisson.getLock("test_lock");
boolean b = testLock.tryLock();
log.info("redis连接测试结果{}", b);
redisson.getLock("test_lock").unlock();
this.redissonClient = redisson;
}
@Bean
public RTopicPublish rTopicPublish() {
RTopicPublish publish = new RTopicPublish();
publish.setRedissonClient(redissonClient);
publish.setProperties(properties);
publish.setSubscriberService(rTopicSubscriberService);
return publish;
}
@Bean
public RTopicSubscribe rTopicSubscribe() {
RTopicSubscribe subscribe = new RTopicSubscribe();
subscribe.setRedissonClient(redissonClient);
subscribe.setProperties(properties);
subscribe.setSubscriberService(rTopicSubscriberService);
return subscribe;
}
@Bean(name = "rTopicSubscriberService")
public RTopicSubscriberService subscriberService() {
return new RTopicSubscriberService();
}
}

@ -0,0 +1,26 @@
package com.lyr.mq.config;
import lombok.Data;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.stereotype.Component;
/**
* @author liyc
* @date 2024/11/11
* @description TODO
**/
@Data
@Component
@ConfigurationProperties(prefix = "rss.mq")
public class RssMqProperties {
private String topicName;
private String confirmTopicName;
private String subscriberChannelName;
private String host;
private Integer port;
private Integer database;
private String password;
}

@ -0,0 +1,42 @@
package com.lyr.mq.consumer;
import com.lyr.mq.config.RssMqProperties;
import com.lyr.mq.service.RTopicSubscriberService;
import lombok.extern.slf4j.Slf4j;
import org.redisson.api.RTopic;
import org.redisson.api.RedissonClient;
import org.redisson.api.listener.MessageListener;
/**
* @author liyc
* @date 2024/11/11
* @description TODO
**/
@Slf4j
public class RTopicSubscribe {
private RedissonClient redissonClient;
private RssMqProperties properties;
private RTopicSubscriberService RTopicSubscriberService;
public void setRedissonClient(RedissonClient redissonClient) {
this.redissonClient = redissonClient;
}
public void setProperties(RssMqProperties properties) {
this.properties = properties;
}
public void setSubscriberService(RTopicSubscriberService RTopicSubscriberService) {
this.RTopicSubscriberService = RTopicSubscriberService;
}
public void startListening() {
RTopic topic = redissonClient.getTopic(properties.getTopicName());
topic.addListener(String.class, (MessageListener<String>) (channel, msg) -> {
RTopicSubscriberService.handleMessage(msg);
});
}
}

@ -0,0 +1,41 @@
package com.lyr.mq.producer;
import com.lyr.mq.config.RssMqProperties;
import com.lyr.mq.service.RTopicSubscriberService;
import lombok.extern.slf4j.Slf4j;
import org.redisson.api.RTopic;
import org.redisson.api.RedissonClient;
import org.springframework.context.annotation.Primary;
import org.springframework.stereotype.Component;
/**
* @author liyc
* @date 2024/11/11
* @description TODO
**/
@Slf4j
@Component
@Primary
public class RTopicPublish {
private RedissonClient redissonClient;
private RssMqProperties properties;
private RTopicSubscriberService rTopicSubscriberService;
public void setRedissonClient(RedissonClient redissonClient) {
this.redissonClient = redissonClient;
}
public void setProperties(RssMqProperties properties) {
this.properties = properties;
}
public void setSubscriberService(RTopicSubscriberService rTopicSubscriberService) {
this.rTopicSubscriberService = rTopicSubscriberService;
}
public void publishMessage(String message) {
RTopic topic = redissonClient.getTopic(properties.getTopicName());
topic.publish(message);
}
}

@ -0,0 +1,16 @@
package com.lyr.mq.service;
import org.springframework.stereotype.Component;
/**
* @author liyc
* @date 2024/11/11
* @description TODO
**/
@Component
public class RTopicSubscriberService {
public void handleMessage(String message) {
System.out.println("Received message: " + message);
// 处理消息的逻辑
}
}

@ -0,0 +1 @@
org.springframework.boot.autoconfigure.EnableAutoConfiguration=com.lyr.mq.RssMqAutoConfiguration
Loading…
Cancel
Save