在Spring中使用非同步事件實現同步事務
結合Scala+Spring,我們將採取一個很簡單的場景:下訂單,然後發送一封電子郵件。
編製一個服務:
@Service
class OrderService @Autowired() (orderDao: OrderDao, mailNotifier: OrderMailNotifier) {
@Transactional
def placeOrder(order: Order) {
orderDao save order //保存訂單
mailNotifier sendMail order //發送郵件
}
}
上面代碼是在保存訂單和發送郵件兩個同步執行,發送郵件需要連接郵件伺服器,比較耗時,拖延了整個性能,我們採取非同步發送電子郵件,利用Spring內置的自定義事件,與JMS或其他生產者 - 消費者類似。
case class OrderPlacedEvent(order: Order) extends ApplicationEvent
@Service
class OrderService @Autowired() (orderDao: OrderDao, eventPublisher: ApplicationEventPublisher) {
@Transactional
def placeOrder(order: Order) {
orderDao save order
eventPublisher publishEvent OrderPlacedEvent(order)
}
}
區別是繼承了ApplicationEvent 之前是直接用 OrderMailNotifier 直接發送,而現在我們使用ApplicationEventPublisher 發送發郵件事件了。
事件監聽者代碼如下:
@Service
class OrderMailNotifier extends ApplicationListener[OrderPlacedEvent] {
def onApplicationEvent(event: OrderPlacedEvent) {
//sending e-mail...
}
}
在監聽者方法中真正實現郵件發送。
但是Spring的ApplicationEvents是同步事件,意味著我們並沒有真正實現非同步,程序還會在這裡堵塞,如果希望非同步,我們需要重新定義一個ApplicationEventMulticaster,實現類型SimpleApplicationEventMulticaster和TaskExecutor:
@Bean
def applicationEventMulticaster() = {
val multicaster = new SimpleApplicationEventMulticaster()
multicaster.setTaskExecutor(taskExecutor())
multicaster
}
@Bean
def taskExecutor() = {
val pool = new ThreadPoolTaskExecutor()
pool.setMaxPoolSize(10)
pool.setCorePoolSize(10)
pool.setThreadNamePrefix("Spring-Async-")
pool
}
Spring通過使用TaskExecutor已經支持廣播事件了,對onApplicationEvent() 標註 @Async
@Async
def onApplicationEvent(event: OrderPlacedEvent) { //...
如果你希望使用@Async,可以編製自己的非同步執行器:
@Configuration
@EnableAsync
class ThreadingConfig extends AsyncConfigurer {
def getAsyncExecutor = taskExecutor()
@Bean
def taskExecutor() = {
val pool = new ThreadPoolTaskExecutor()
pool.setMaxPoolSize(10)
pool.setCorePoolSize(10)
pool.setThreadNamePrefix("Spring-Async-")
pool
}
}
@ EnableAsync是足夠了。,默認情況下,Spring使用SimpleAsyncTaskExecutor類創建新的線程。
以上所有設置暴露一個真正的問題。現在,我們雖然使用其他線程發送一個非同步消息處理。不幸的是,我們引入競爭條件。
- 開始事務
- 存儲order到資料庫
- 發送一個包裝order的消息
- 確認
非同步線程獲得OrderPlacedEvent並開始處理。現在的問題是,它發生(3)之後,還是(4)之前或者(4)之後?這有一個很大的區別!在前者的情況下,交易也尚未提交訂單所以不存在於資料庫中。另一方面,延遲載入可能已經在工作,致使訂單對象仍然然綁定在 PersistenceContext(預設我們使用JPA)。
解決辦法是使用 TransactionSynchronizationManager.,可以註冊很多監聽者 TransactionSynchronization,它對於事務的提交或回滾都有事件發送。
@Transactional
def placeOrder(order: Order) {
orderDao save order
afterCommit {
eventPublisher publishEvent OrderPlacedEvent(order)
}
}
private def afterCommit[T](fun: => T) {
TransactionSynchronizationManager.registerSynchronization(new TransactionSynchronizationAdapter {
override def afterCommit() {
fun
}
})
}
當前事務提交後 afterCommit()接受調用,可以安全地調用registerSynchronization()多次 - 監聽器存儲在Set並且本地保存到當前事務中,事務提交後消失。
我們將afterCommit方法單獨抽象成一個類,分離關注。
class TransactionAwareApplicationEventPublisher(delegate: ApplicationEventPublisher)
extends ApplicationEventPublisher {
override def publishEvent(event: ApplicationEvent) {
if (TransactionSynchronizationManager.isActualTransactionActive) {
TransactionSynchronizationManager.registerSynchronization(
new TransactionSynchronizationAdapter {
override def afterCommit() {
delegate publishEvent event
}
})
}
else
delegate publishEvent event
}
}
TransactionAwareApplicationEventPublisher是實現Spring的ApplicationEventPublisher。
我們要將這個新的實現告訴Spring替換掉舊的,用@Primary:
@Resource
val applicationContext: ApplicationContext = null
@Bean
@Primary
def transactionAwareApplicationEventPublisher() =
new TransactionAwareApplicationEventPublisher(applicationContext)
再看看原來的訂單服務:
@Service
class OrderService @Autowired() (orderDao: OrderDao, eventPublisher: ApplicationEventPublisher) {
@Transactional
def placeOrder(order: Order) {
orderDao save order
eventPublisher publishEvent OrderPlacedEvent(order)
}
注意這裡ApplicationEventPublisher已經是我們自己實現的TransactionAwareApplicationEventPublisher,將被自動注入這個服務。
最後,要在真正訂單保存的業務代碼上放置事務:
def placeOrder(order: Order) {
storeOrder(order)
eventPublisher publishEvent OrderPlacedEvent(order)
}
@Transactional
def storeOrder(order: Order) = orderDao save order
當然這沒有根本解決問題,如果placeOrder有一個更大的事務怎麼辦?
打開今日頭條,查看更多圖片※近20個絢麗實用的jQuery/CSS3側邊欄菜單
※如何在前端進行簡訊介面和語音驗證介面的使用
TAG:程序員小新人學習 |