Java源码示例:org.apache.rocketmq.spring.core.RocketMQLocalTransactionState
示例1
/**
* 提交本地事务
*/
@Override
public RocketMQLocalTransactionState executeLocalTransaction(Message message, Object arg) {
//插入订单数据
String orderJson = new String(((byte[])message.getPayload()));
Order order = JSON.parseObject(orderJson, Order.class);
orderService.save(order);
String produceError = (String)message.getHeaders().get("produceError");
if ("1".equals(produceError)) {
System.err.println("============Exception:订单进程挂了,事务消息没提交");
//模拟插入订单后服务器挂了,没有commit事务消息
throw new RuntimeException("============订单服务器挂了");
}
//提交事务消息
return RocketMQLocalTransactionState.COMMIT;
}
示例2
/**
* 执行本地事务
* @param message 消息
* @param o 额外参数
* @return RocketMQ事务状态
*/
@Override
public RocketMQLocalTransactionState executeLocalTransaction(Message message, Object o) {
MessageHeaders headers = message.getHeaders();
String transicationId = (String) headers.get(RocketMQHeaders.TRANSACTION_ID);
try {
FwTradeLog tradeLog = (FwTradeLog) o;
orderService.payOrder(tradeLog,transicationId); // 对应图中第3步,执行本地事务
log.info("本地事务=>{} 执行成功,往RocketMQ发送COMMIT",transicationId);
return RocketMQLocalTransactionState.COMMIT; // 对应图中第4步,COMMIT,半消息经过COMMIT后,消息消费端就可以消费这条消息了
} catch (Exception e){
log.info("本地事务=>{} 回滚,往RocketMQ发送ROLLBACK",transicationId ,e);
return RocketMQLocalTransactionState.ROLLBACK; // 对应途中第4步,ROLLBACK
}
}
示例3
@Override
public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) {
String transId = (String)msg.getHeaders().get(RocketMQHeaders.TRANSACTION_ID);
System.out.printf("#### executeLocalTransaction is executed, msgTransactionId=%s %n",
transId);
int value = transactionIndex.getAndIncrement();
int status = value % 3;
localTrans.put(transId, status);
if (status == 0) {
// Return local transaction with success(commit), in this case,
// this message will not be checked in checkLocalTransaction()
System.out.printf(" # COMMIT # Simulating msg %s related local transaction exec succeeded! ### %n", msg.getPayload());
return RocketMQLocalTransactionState.COMMIT;
}
if (status == 1) {
// Return local transaction with failure(rollback) , in this case,
// this message will not be checked in checkLocalTransaction()
System.out.printf(" # ROLLBACK # Simulating %s related local transaction exec failed! %n", msg.getPayload());
return RocketMQLocalTransactionState.ROLLBACK;
}
System.out.printf(" # UNKNOW # Simulating %s related local transaction exec UNKNOWN! \n");
return RocketMQLocalTransactionState.UNKNOWN;
}
示例4
@Override
public RocketMQLocalTransactionState checkLocalTransaction(Message msg) {
String transId = (String)msg.getHeaders().get(RocketMQHeaders.TRANSACTION_ID);
RocketMQLocalTransactionState retState = RocketMQLocalTransactionState.COMMIT;
Integer status = localTrans.get(transId);
if (null != status) {
switch (status) {
case 0:
retState = RocketMQLocalTransactionState.UNKNOWN;
break;
case 1:
retState = RocketMQLocalTransactionState.COMMIT;
break;
case 2:
retState = RocketMQLocalTransactionState.COMMIT;
break;
}
}
System.out.printf("------ !!! checkLocalTransaction is executed once," +
" msgTransactionId=%s, TransactionState=%s status=%s %n",
transId, retState, status);
return retState;
}
示例5
@Override
public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) {
String transId = (String) msg.getHeaders().get(RocketMQHeaders.TRANSACTION_ID);
System.out.printf("#### executeLocalTransaction is executed, msgTransactionId=%s %n",
transId);
int value = transactionIndex.getAndIncrement();
int status = value % 3;
localTrans.put(transId, status);
if (status == 0) {
// Return local transaction with success(commit), in this case,
// this message will not be checked in checkLocalTransaction()
System.out.printf(" # COMMIT # Simulating msg %s related local transaction exec succeeded! ### %n", msg.getPayload());
return RocketMQLocalTransactionState.COMMIT;
}
if (status == 1) {
// Return local transaction with failure(rollback) , in this case,
// this message will not be checked in checkLocalTransaction()
System.out.printf(" # ROLLBACK # Simulating %s related local transaction exec failed! %n", msg.getPayload());
return RocketMQLocalTransactionState.ROLLBACK;
}
System.out.printf(" # UNKNOW # Simulating %s related local transaction exec UNKNOWN! \n");
return RocketMQLocalTransactionState.UNKNOWN;
}
示例6
@Override
public RocketMQLocalTransactionState checkLocalTransaction(Message msg) {
String transId = (String) msg.getHeaders().get(RocketMQHeaders.TRANSACTION_ID);
RocketMQLocalTransactionState retState = RocketMQLocalTransactionState.COMMIT;
Integer status = localTrans.get(transId);
if (null != status) {
switch (status) {
case 0:
retState = RocketMQLocalTransactionState.UNKNOWN;
break;
case 1:
retState = RocketMQLocalTransactionState.COMMIT;
break;
case 2:
retState = RocketMQLocalTransactionState.ROLLBACK;
break;
}
}
System.out.printf("------ !!! checkLocalTransaction is executed once," +
" msgTransactionId=%s, TransactionState=%s status=%s %n",
transId, retState, status);
return retState;
}
示例7
@Override
public RocketMQLocalTransactionState executeLocalTransaction(Message message, Object o) {
//事务ID
String transId = (String) message.getHeaders().get(RocketMQHeaders.TRANSACTION_ID);
System.out.printf("#### executeLocalTransaction is executed, msgTransactionId=%s %n", transId);
//获取增量
int value = transactionIndex.getAndIncrement();
int status = value % 3;
localTrans.put(transId, status);
if (status == 0) {
System.out.printf(" # COMMIT # 模拟消息 %s 本地事务执行成功! ### %n", message.getPayload());
return RocketMQLocalTransactionState.COMMIT;
}
if (status == 1) {
System.out.printf(" # COMMIT # 模拟消息 %s 本地事务执行失败! ### %n", message.getPayload());
return RocketMQLocalTransactionState.ROLLBACK;
}
System.out.printf(" # UNKNOW # 模拟 %s 本地事务执行异常! \n");
return RocketMQLocalTransactionState.UNKNOWN;
}
示例8
@Override
public RocketMQLocalTransactionState checkLocalTransaction(Message message) {
//事务ID
String transId = (String) message.getHeaders().get(RocketMQHeaders.TRANSACTION_ID);
RocketMQLocalTransactionState transactionState = RocketMQLocalTransactionState.COMMIT;
Integer status = localTrans.get(transId);
if (null != status) {
switch (status) {
case 0:
transactionState = RocketMQLocalTransactionState.UNKNOWN;
break;
case 1:
transactionState = RocketMQLocalTransactionState.COMMIT;
break;
case 2:
transactionState = RocketMQLocalTransactionState.ROLLBACK;
break;
}
}
System.out.printf("------ !!! checkLocalTransaction is executed once," +
" msgTransactionId=%s, TransactionState=%s status=%s %n",
transId, transactionState, status);
return transactionState;
}
示例9
@Override
public RocketMQLocalTransactionState executeLocalTransaction(Message msg,
Object arg) {
Object num = msg.getHeaders().get("test");
if ("1".equals(num)) {
System.out.println(
"executer: " + new String((byte[]) msg.getPayload()) + " unknown");
return RocketMQLocalTransactionState.UNKNOWN;
}
else if ("2".equals(num)) {
System.out.println(
"executer: " + new String((byte[]) msg.getPayload()) + " rollback");
return RocketMQLocalTransactionState.ROLLBACK;
}
System.out.println(
"executer: " + new String((byte[]) msg.getPayload()) + " commit");
return RocketMQLocalTransactionState.COMMIT;
}
示例10
/**
* 事务回查接口
*
* 如果事务消息一直没提交,则定时判断订单数据是否已经插入
* 是:提交事务消息
* 否:回滚事务消息
*/
@Override
public RocketMQLocalTransactionState checkLocalTransaction(Message message) {
String orderId = (String)message.getHeaders().get("orderId");
System.out.println("============事务回查-orderId:" + orderId);
//判断之前的事务是否已经提交:订单记录是否已经保存
int count = 1;
//select count(1) from t_order where order_id = ${orderId}
System.out.println("============事务回查-订单已生成-提交事务消息");
return count > 0 ? RocketMQLocalTransactionState.COMMIT : RocketMQLocalTransactionState.ROLLBACK;
}
示例11
@Override
public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) {
Object num = msg.getHeaders().get("test");
if ("1".equals(num)) {
System.out.println("executer: " + new String((byte[]) msg.getPayload()) + " unknown");
return RocketMQLocalTransactionState.UNKNOWN;
} else if ("2".equals(num)) {
System.out.println("executer: " + new String((byte[]) msg.getPayload()) + " rollback");
return RocketMQLocalTransactionState.ROLLBACK;
}
System.out.println("executer: " + new String((byte[]) msg.getPayload()) + " commit");
return RocketMQLocalTransactionState.COMMIT;
}
示例12
/**
* RocketMQ回查本地事务状态
* @param message 消息
* @return RocketMQ事务状态
*/
@Override
public RocketMQLocalTransactionState checkLocalTransaction(Message message) {
MessageHeaders headers = message.getHeaders();
String transicationId = (String) headers.get(RocketMQHeaders.TRANSACTION_ID);
log.info("RocketMQ事务状态回查=>{}",transicationId);
// 从数据库中根据事务Id查询对应的事务日志,对应图中第6步
FwTransactionLog transactionLog = fwTransactionLogService.getOne(
new LambdaQueryWrapper<FwTransactionLog>().eq(FwTransactionLog::getTransactionId, transicationId)
);
// 对应图中的第7步骤
return transactionLog != null ? RocketMQLocalTransactionState.COMMIT : RocketMQLocalTransactionState.ROLLBACK;
}
示例13
@Override
public RocketMQLocalTransactionState executeLocalTransaction(Message msg,
Object arg) {
fooService.checkInfo(msg);
if ("1".equals(msg.getHeaders().get("error"))) {
System.out.println(new String((byte[]) msg.getPayload()) + " rollback");
return RocketMQLocalTransactionState.ROLLBACK;
}
return RocketMQLocalTransactionState.COMMIT;
}
示例14
private static LocalTransactionState convertLocalTransactionState(RocketMQLocalTransactionState state) {
switch (state) {
case UNKNOWN:
return LocalTransactionState.UNKNOW;
case COMMIT:
return LocalTransactionState.COMMIT_MESSAGE;
case ROLLBACK:
return LocalTransactionState.ROLLBACK_MESSAGE;
}
// Never happen
log.warn("Failed to covert enum type RocketMQLocalTransactionState {}.", state);
return LocalTransactionState.UNKNOW;
}
示例15
@Override
public RocketMQLocalTransactionState checkLocalTransaction(Message msg) {
System.out.println("check: " + new String((byte[]) msg.getPayload()));
return RocketMQLocalTransactionState.COMMIT;
}
示例16
@Override
public RocketMQLocalTransactionState checkLocalTransaction(Message msg) {
System.out.println(msg + " check local");
return RocketMQLocalTransactionState.COMMIT;
}
示例17
@Override
public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) {
return RocketMQLocalTransactionState.COMMIT;
}
示例18
@Override
public RocketMQLocalTransactionState checkLocalTransaction(Message msg) {
return RocketMQLocalTransactionState.COMMIT;
}
示例19
@Override
public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) {
System.out.printf("ExtTransactionListenerImpl executeLocalTransaction and return UNKNOWN. \n");
return RocketMQLocalTransactionState.UNKNOWN;
}
示例20
@Override
public RocketMQLocalTransactionState checkLocalTransaction(Message msg) {
System.out.printf("ExtTransactionListenerImpl checkLocalTransaction and return COMMIT. \n");
return RocketMQLocalTransactionState.COMMIT;
}
示例21
@Override
public RocketMQLocalTransactionState checkLocalTransaction(Message msg) {
System.out.println("check: " + new String((byte[]) msg.getPayload()));
return RocketMQLocalTransactionState.COMMIT;
}
示例22
@Override
public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) {
// ... local transaction process, return ROLLBACK, COMMIT or UNKNOWN
return RocketMQLocalTransactionState.UNKNOWN;
}
示例23
@Override
public RocketMQLocalTransactionState checkLocalTransaction(Message msg) {
// ... check transaction status and return ROLLBACK, COMMIT or UNKNOWN
return RocketMQLocalTransactionState.COMMIT;
}