提交 909f30ca 作者: obcy

添加重试工具

上级 611e4a1d
package com.zzsn.code.base.core.util.processutil;
import com.zzsn.code.base.core.util.collectionutils.CollectionUtils;
import com.zzsn.code.base.core.util.objutil.ObjectUtils;
import java.util.*;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.logging.Logger;
/**
* Description:
*
* @author: EDY
* @since: 2023/12/28
*/
public class ProcessUtil {
private static final Logger logger = Logger.getLogger(ProcessUtil.class.getName());
/**计数器,记录每一个任务的失败次数*/
Map<String,Integer> businessCount = new HashMap<>();
/**任务失败集合,所有重试都失败后,任务id会存放到对应key的set集合中*/
Map<String, Set<String>> errorBus = new HashMap<>();
/**获取所有失败的业务id*/
public Map<String, Set<String>> getErrorBus() {
return errorBus;
}
/**根据指定的key获取失败的业务id*/
public Set<String> getErrorBusByErrorKey(String errorKey) {
return errorBus.get(errorKey);
}
/*******************************有参有返回值实现*********************************/
/** bussinessId 本次处理的任务id
* dto 本次任务的参数
* processFunction 本次任务的处理逻辑
* sign为重试标识 1重试 0正常
* redisKey 错误的业务id的map中的key
* */
public <T, R> R processLocal (String bussinessId, T parameter , Function<T, R> action, String localKey){
return doProcessLocal(bussinessId,parameter,0,action,localKey,3);
}
/** bussinessId 本次处理的任务id
* dto 本次任务的参数
* processFunction 本次任务的处理逻辑
* sign为重试标识 1重试 0正常*/
public <T, R> R processLocal (String bussinessId,T parameter ,Function<T, R> action){
return doProcessLocal(bussinessId,parameter,0,action,"def-process-errors-local",3);
}
/**
* dto 本次任务的参数
* processFunction 本次任务的处理逻辑
* localKey错误任务id的存放key
* sign为重试标识 1重试 0正常*/
public <T, R> R processLocal (T parameter ,Function<T, R> action,String localKey){
return doProcessLocal(UUID.randomUUID().toString(),parameter,0,action,localKey,3);
}
/**
* dto 本次任务的参数
* processFunction 本次任务的处理逻辑
* sign为重试标识 1重试 0正常*/
public <T, R> R processLocal (T parameter ,Function<T, R> action){
return doProcessLocal(UUID.randomUUID().toString(),parameter,0,action,"def-process-errors-local",3);
}
/** bussinessId 本次处理的任务id
* dto 本次任务的参数
* processFunction 本次任务的处理逻辑
* sign为重试标识 1重试 0正常
* retrty 重试次数
* */
public <T, R> R processLocalRetry(String bussinessId, T parameter ,Function<T, R> action,String localKey,int retry){
return doProcessLocal(bussinessId,parameter,0,action,localKey,retry);
}
/** bussinessId 本次处理的任务id
* dto 本次任务的参数
* processFunction 本次任务的处理逻辑
* sign为重试标识 1重试 0正常
* redisKey 错误的业务id的redis中的key
* */
private <T, R> R doProcessLocal (String bussinessId, T parameter , int sign, Function<T, R> action, String localKey, int retry){
long startTime = System.currentTimeMillis();
R res = null;
try {
res = action.apply(parameter);
if (1 == sign) {
businessCount.remove(bussinessId);
}
long endTime = System.currentTimeMillis();
logger.info(bussinessId + "任务处理结束===用时=>"+ (endTime - startTime)+"毫秒");
} catch (Exception e) {
logger.warning(bussinessId+"任务处理异常=="+e.getMessage());
Integer num = businessCount.get(bussinessId);
if(ObjectUtils.isNotEmpty(num)&&num == retry){
if (CollectionUtils.isEmpty(errorBus.get(localKey))){
errorBus.put(localKey,new HashSet<>());
}
errorBus.get(localKey).add(bussinessId);
businessCount.remove(bussinessId);
logger.warning(bussinessId+"重试处理"+retry+"次失败");
}else {
Integer numm = businessCount.get(bussinessId);
if (ObjectUtils.isEmpty(num)){
numm= 0;
}
businessCount.put(bussinessId,numm+1);
doProcessLocal(bussinessId,parameter,1,action,localKey,retry);
}
}
return res;
}
/****************************************************************/
/*******************************有参无返回值实现--消费型函数*********************************/
/**
* dto 本次任务的参数
* processFunction 本次任务的处理逻辑
* localKey错误任务id的存放key
* sign为重试标识 1重试 0正常*/
public <T> void processLocal (T parameter , Consumer<T> action, String localKey){
doProcessLocal(UUID.randomUUID().toString(),parameter,0,action,localKey,3);
}
/**
* dto 本次任务的参数
* processFunction 本次任务的处理逻辑
* sign为重试标识 1重试 0正常*/
public <T> void processLocal (T parameter ,Consumer<T> action){
doProcessLocal(UUID.randomUUID().toString(),parameter,0,action,"def-process-errors-local",5);
}
/** bussinessId 本次处理的任务id
* dto 本次任务的参数
* processFunction 本次任务的处理逻辑
* sign为重试标识 1重试 0正常
* redisKey 错误的业务id的map中的key
* */
public <T> void processLocal (String bussinessId, T dto , Consumer<T> action, String localKey){
doProcessLocal(bussinessId,dto,0,action,localKey,3);
}
/** bussinessId 本次处理的任务id
* dto 本次任务的参数
* processFunction 本次任务的处理逻辑
* sign为重试标识 1重试 0正常*/
public <T> void processLocal (String bussinessId, T dto ,Consumer<T> action){
doProcessLocal(bussinessId,dto,0,action,"def-process-errors-local",3);
}
/** bussinessId 本次处理的任务id
* dto 本次任务的参数
* processFunction 本次任务的处理逻辑
* sign为重试标识 1重试 0正常
* retrty 重试次数
* */
public <T> void processLocalRetry(String bussinessId, T dto ,Consumer<T> action,String localKey,int retry){
doProcessLocal(bussinessId,dto,0,action,localKey,retry);
}
/** bussinessId 本次处理的任务id
* dto 本次任务的参数
* processFunction 本次任务的处理逻辑
* sign为重试标识 1重试 0正常
* redisKey 错误的业务id的redis中的key
* */
private <T> void doProcessLocal (String bussinessId, T dto , int sign,Consumer<T> action,String localKey,int retry){
long startTime = System.currentTimeMillis();
try {
action.accept(dto);
if (1 == sign) {
businessCount.remove(bussinessId);
}
long endTime = System.currentTimeMillis();
logger.info(bussinessId+"任务处理结束===用时=>"+(endTime - startTime)+"毫秒");
} catch (Exception e) {
logger.warning(bussinessId+"任务处理异常=="+e.getMessage());
Integer num = businessCount.get(bussinessId);
if(ObjectUtils.isNotEmpty(num)&&num == retry){
if (CollectionUtils.isEmpty(errorBus.get(localKey))){
errorBus.put(localKey,new HashSet<>());
}
errorBus.get(localKey).add(bussinessId);
businessCount.remove(bussinessId);
logger.warning(bussinessId+"重试处理"+retry+"次失败");
}else {
Integer numm = businessCount.get(bussinessId);
if (ObjectUtils.isEmpty(num)){
numm= 0;
}
businessCount.put(bussinessId,numm+1);
doProcessLocal(bussinessId,dto,1,action,localKey,retry);
}
}
}
}
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论