Java事务处理实战:存储过程、文件与消息队列
Java特殊场景事务处理实战指南
1. 存储过程调用的事务控制
存储过程作为数据库预编译的程序单元,在Java应用中调用时需要特别注意事务边界控制。
1.1 事务传播机制
@Service
public class OrderService {
@Autowired
private JdbcTemplate jdbcTemplate;
@Transactional
public void processOrderWithProcedure(Long orderId) {
// 调用存储过程
jdbcTemplate.execute(
"{call UPDATE_ORDER_STATUS(?, ?)}",
new Object[]{orderId, "PROCESSING"}
);
// 其他业务操作
updateInventory(orderId);
}
}
关键点:
- 存储过程内部的事务与外部事务的关系取决于数据库实现
- Oracle的自治事务(
PRAGMA AUTONOMOUS_TRANSACTION
)会使存储过程独立提交 - MySQL存储过程默认参与外部事务
1.2 最佳实践
- 明确事务边界:在存储过程文档中注明其事务行为
- 异常处理:捕获
SQLException
并转换为Spring的DataAccessException
- 性能考虑:避免在存储过程中执行耗时操作导致长事务
2. 文件操作与事务一致性
2.1 文件上传+DB写入的原子性保障
实现方案:
public class FileUploadService {
@Value("${file.temp.dir}")
private String tempDir;
@Value("${file.final.dir}")
private String finalDir;
@Transactional
public void uploadFileWithTransaction(MultipartFile file, FileMetadata metadata) {
// 1. 保存到临时位置
Path tempPath = Paths.get(tempDir, file.getOriginalFilename());
Files.copy(file.getInputStream(), tempPath);
try {
// 2. 保存元数据到数据库
fileMetadataRepository.save(metadata);
// 3. 移动到正式位置
Path finalPath = Paths.get(finalDir, metadata.getStoredName());
Files.move(tempPath, finalPath);
} catch (Exception e) {
// 4. 失败时清理临时文件
Files.deleteIfExists(tempPath);
throw e;
}
}
}
注意事项:
- 使用临时目录避免部分可见的文件状态
- 考虑使用分布式文件系统时的一致性保证
- 大文件上传建议分片处理
3. 消息队列与事务联动
3.1 事务性消息(RocketMQ示例)
@RestController
public class OrderController {
@Autowired
private RocketMQTemplate rocketMQTemplate;
@Transactional
@PostMapping("/createOrder")
public String createOrder(@RequestBody Order order) {
// 1. 保存订单
orderRepository.save(order);
// 2. 发送事务消息
TransactionSendResult result = rocketMQTemplate.sendMessageInTransaction(
"order-topic",
MessageBuilder.withPayload(order).build(),
order
);
if (result.getLocalTransactionState() != LocalTransactionState.COMMIT_MESSAGE) {
throw new RuntimeException("消息发送失败");
}
return "success";
}
}
// 事务监听器
@RocketMQTransactionListener
class OrderTransactionListenerImpl implements RocketMQLocalTransactionListener {
@Override
public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) {
Order order = (Order) arg;
try {
// 执行本地事务
inventoryService.reduceStock(order.getProductId(), order.getQuantity());
return RocketMQLocalTransactionState.COMMIT;
} catch (Exception e) {
return RocketMQLocalTransactionState.ROLLBACK;
}
}
@Override
public RocketMQLocalTransactionState checkLocalTransaction(Message msg) {
// 检查本地事务状态
String orderId = msg.getHeaders().get("orderId").toString();
return orderRepository.existsById(orderId) ?
RocketMQLocalTransactionState.COMMIT :
RocketMQLocalTransactionState.ROLLBACK;
}
}
3.2 本地事务表+定时任务方案
实现代码:
@Entity
@Table(name = "outbox_messages")
public class OutboxMessage {
@Id
@GeneratedValue(strategy = GenerationType.IDENTITY)
private Long id;
private String topic;
@Column(columnDefinition = "TEXT")
private String payload;
private String status;
private Date createdAt;
private Date processedAt;
// getters & setters
}
@Service
@Transactional
public class OrderService {
@Autowired
private OutboxMessageRepository outboxRepository;
@Autowired
private OrderRepository orderRepository;
public void createOrder(Order order) {
// 1. 保存订单
orderRepository.save(order);
// 2. 写入发件箱
OutboxMessage message = new OutboxMessage();
message.setTopic("order.created");
message.setPayload(JsonUtils.toJson(order));
message.setStatus("PENDING");
message.setCreatedAt(new Date());
outboxRepository.save(message);
}
}
@Component
public class MessageRelay {
@Autowired
private OutboxMessageRepository outboxRepository;
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
@Scheduled(fixedRate = 5000)
public void pollAndSendMessages() {
List<OutboxMessage> messages = outboxRepository
.findByStatus("PENDING", PageRequest.of(0, 100));
for (OutboxMessage message : messages) {
try {
kafkaTemplate.send(message.getTopic(), message.getPayload());
message.setStatus("SENT");
message.setProcessedAt(new Date());
outboxRepository.save(message);
} catch (Exception e) {
log.error("消息发送失败: {}", message.getId(), e);
}
}
}
}
方案对比:
方案 | 优点 | 缺点 | 适用场景 |
---|---|---|---|
事务消息(RocketMQ) | 实时性高,实现简单 | 依赖MQ厂商实现 | 金融支付等高一致性要求场景 |
本地事务表 | 通用性强,不依赖特定MQ | 有一定延迟,需要额外表维护 | 电商订单等最终一致性场景 |
实践建议
存储过程事务:
- 明确文档记录存储过程的事务行为
- 避免在存储过程中做业务逻辑判断
- 考虑使用
@Transactional(propagation = Propagation.NOT_SUPPORTED)
暂停事务
文件事务:
- 采用"写临时文件→DB提交→移动文件"的三段式提交
- 定期清理残留的临时文件
- 考虑使用HDFS等支持原子操作的文件系统
消息事务:
- 事务消息优先选择RocketMQ等原生支持的产品
- 本地事务表方案建议增加幂等消费处理
- 监控消息积压情况,设置合理的重试策略
通过合理运用这些模式,可以在Java应用中实现复杂场景下的数据一致性保障。