當前位置:
首頁 > 知識 > 在Spring中使用非同步事件實現同步事務

在Spring中使用非同步事件實現同步事務

結合Scala+Spring,我們將採取一個很簡單的場景:下訂單,然後發送一封電子郵件。

編製一個服務:

@Service

class OrderService @Autowired() (orderDao: OrderDao, mailNotifier: OrderMailNotifier) {

@Transactional

def placeOrder(order: Order) {

orderDao save order //保存訂單

mailNotifier sendMail order //發送郵件

}

}

上面代碼是在保存訂單和發送郵件兩個同步執行,發送郵件需要連接郵件伺服器,比較耗時,拖延了整個性能,我們採取非同步發送電子郵件,利用Spring內置的自定義事件,與JMS或其他生產者 - 消費者類似。

case class OrderPlacedEvent(order: Order) extends ApplicationEvent

@Service

class OrderService @Autowired() (orderDao: OrderDao, eventPublisher: ApplicationEventPublisher) {

@Transactional

def placeOrder(order: Order) {

orderDao save order

eventPublisher publishEvent OrderPlacedEvent(order)

}

}

區別是繼承了ApplicationEvent 之前是直接用 OrderMailNotifier 直接發送,而現在我們使用ApplicationEventPublisher 發送發郵件事件了。

事件監聽者代碼如下:

@Service

class OrderMailNotifier extends ApplicationListener[OrderPlacedEvent] {

def onApplicationEvent(event: OrderPlacedEvent) {

//sending e-mail...

}

}

在監聽者方法中真正實現郵件發送。

但是Spring的ApplicationEvents是同步事件,意味著我們並沒有真正實現非同步,程序還會在這裡堵塞,如果希望非同步,我們需要重新定義一個ApplicationEventMulticaster,實現類型SimpleApplicationEventMulticaster和TaskExecutor:

@Bean

def applicationEventMulticaster() = {

val multicaster = new SimpleApplicationEventMulticaster()

multicaster.setTaskExecutor(taskExecutor())

multicaster

}

@Bean

def taskExecutor() = {

val pool = new ThreadPoolTaskExecutor()

pool.setMaxPoolSize(10)

pool.setCorePoolSize(10)

pool.setThreadNamePrefix("Spring-Async-")

pool

}

Spring通過使用TaskExecutor已經支持廣播事件了,對onApplicationEvent() 標註 @Async

@Async

def onApplicationEvent(event: OrderPlacedEvent) { //...

如果你希望使用@Async,可以編製自己的非同步執行器:

@Configuration

@EnableAsync

class ThreadingConfig extends AsyncConfigurer {

def getAsyncExecutor = taskExecutor()

@Bean

def taskExecutor() = {

val pool = new ThreadPoolTaskExecutor()

pool.setMaxPoolSize(10)

pool.setCorePoolSize(10)

pool.setThreadNamePrefix("Spring-Async-")

pool

}

}

@ EnableAsync是足夠了。,默認情況下,Spring使用SimpleAsyncTaskExecutor類創建新的線程。

以上所有設置暴露一個真正的問題。現在,我們雖然使用其他線程發送一個非同步消息處理。不幸的是,我們引入競爭條件。

  1. 開始事務
  2. 存儲order到資料庫
  3. 發送一個包裝order的消息
  4. 確認

非同步線程獲得OrderPlacedEvent並開始處理。現在的問題是,它發生(3)之後,還是(4)之前或者(4)之後?這有一個很大的區別!在前者的情況下,交易也尚未提交訂單所以不存在於資料庫中。另一方面,延遲載入可能已經在工作,致使訂單對象仍然然綁定在 PersistenceContext(預設我們使用JPA)。

解決辦法是使用 TransactionSynchronizationManager.,可以註冊很多監聽者 TransactionSynchronization,它對於事務的提交或回滾都有事件發送。

@Transactional

def placeOrder(order: Order) {

orderDao save order

afterCommit {

eventPublisher publishEvent OrderPlacedEvent(order)

}

}

private def afterCommit[T](fun: => T) {

TransactionSynchronizationManager.registerSynchronization(new TransactionSynchronizationAdapter {

override def afterCommit() {

fun

}

})

}

當前事務提交後 afterCommit()接受調用,可以安全地調用registerSynchronization()多次 - 監聽器存儲在Set並且本地保存到當前事務中,事務提交後消失。

我們將afterCommit方法單獨抽象成一個類,分離關注。

class TransactionAwareApplicationEventPublisher(delegate: ApplicationEventPublisher)

extends ApplicationEventPublisher {

override def publishEvent(event: ApplicationEvent) {

if (TransactionSynchronizationManager.isActualTransactionActive) {

TransactionSynchronizationManager.registerSynchronization(

new TransactionSynchronizationAdapter {

override def afterCommit() {

delegate publishEvent event

}

})

}

else

delegate publishEvent event

}

}

TransactionAwareApplicationEventPublisher是實現Spring的ApplicationEventPublisher。

我們要將這個新的實現告訴Spring替換掉舊的,用@Primary:

@Resource

val applicationContext: ApplicationContext = null

@Bean

@Primary

def transactionAwareApplicationEventPublisher() =

new TransactionAwareApplicationEventPublisher(applicationContext)

再看看原來的訂單服務:

@Service

class OrderService @Autowired() (orderDao: OrderDao, eventPublisher: ApplicationEventPublisher) {

@Transactional

def placeOrder(order: Order) {

orderDao save order

eventPublisher publishEvent OrderPlacedEvent(order)

}

注意這裡ApplicationEventPublisher已經是我們自己實現的TransactionAwareApplicationEventPublisher,將被自動注入這個服務。

最後,要在真正訂單保存的業務代碼上放置事務:

def placeOrder(order: Order) {

storeOrder(order)

eventPublisher publishEvent OrderPlacedEvent(order)

}

@Transactional

def storeOrder(order: Order) = orderDao save order

當然這沒有根本解決問題,如果placeOrder有一個更大的事務怎麼辦?

在Spring中使用非同步事件實現同步事務

打開今日頭條,查看更多圖片
喜歡這篇文章嗎?立刻分享出去讓更多人知道吧!

本站內容充實豐富,博大精深,小編精選每日熱門資訊,隨時更新,點擊「搶先收到最新資訊」瀏覽吧!


請您繼續閱讀更多來自 程序員小新人學習 的精彩文章:

近20個絢麗實用的jQuery/CSS3側邊欄菜單
如何在前端進行簡訊介面和語音驗證介面的使用

TAG:程序員小新人學習 |