coroutine은 Thread-Safety와 DownStream으로의 문제 전달을 막기 위해 flow 블록 내에서는 Context를 하나만 사용 할 수 있다.
<aside> 💡 UpStream / DownStream
UpStream : flowOn 이전의 생산자, 중간 연산자
DownStream : flowOn 이후 연산자
</aside>
이런 문제를 위해 rx의 subscribeOn과 유사하게 flowOn을 통해 context를 바꾸어줄수있지만 하나의 블록안에서 동적으로 바꾸거나 callback 함수에서는 사용 할 수가 없다.
이러한 문제를 위해 channelFlow와 callbackFlow가 있다.
이 둘은 기능적으로 큰 차이가 없다.
💙 예시
//1초 안에 api가 중복으로 호출된다면 제일 마지막 요청만 서버에 전달한다.
private var deferred: Deferred<NetworkResult<ResRecordHistoryData>>? = null
override suspend fun sendToServer(params: Int, delay: Long) = channelFlow {
Timber.e("===1. 서버 요청=== $registerWalkingDeferred")
trySend(NetworkResult.Loading)
CoroutineScope(ioDispatcher).launch {
Timber.e("===2. 이전 요청 취소===")
deferred?.cancelAndJoin()
deferred = async {
Timber.e("===3. 1초 지연 $delay===")
delay(delay)
Timber.e("===4. 실제 서버 호출===")
return@async safeApiCall { apipiService.sendToServer(params) }
}
deferred?.await()?.let {
Timber.d("===5. 데이터 전송 it = $it")
trySend(it)
}
}
awaitClose()
}
//api 호출전 다른 api를 체크하고 그 결과값을 넘겨 받고 싶을 때
enum class ApiProcess {
HOLD,
NEW_TOKEN,
VALID_TOKEN,
LOGOUT,
NETWORK_UNSTABLE
}
/**
* 로그인 토큰 관련 데이터
*/
private val _checkTokenFlow = MutableStateFlow(ApiProcess.HOLD)
val checkTokenFlow = _checkTokenFlow.asStateFlow()
/**
* 로그인 코든 관련 flow 구독을 해제 할 때 사용
*/
suspend fun <E> SendChannel<E>.closeTokenApi() {
_checkTokenFlow.emit(ApiProcess.HOLD)
close()
}
fun <T> checkTokenApiCall(
dataStoreRepository: DataStoreRepository,
apiCall: suspend () -> Response<T>
) = channelFlow {
Timber.d("checkTokenApiCall trySend Loading")
//1. 로딩
trySend(NetworkResult.Loading)
//2. 로그인 토큰 결과값
launch {
checkTokenFlow.collect {
Timber.d("checkTokenApiCall it = $it")
when (it) {
ApiProcess.NEW_TOKEN,
ApiProcess.VALID_TOKEN -> {
val isBlockApi = dataStoreRepository.isBlockApi()
Timber.d("checkTokenApiCall okhttp isBlockApi = $isBlockApi")
if (isBlockApi) { //다른 디바이스에서 로그인이 되어 api 호출을 할 필요가 없을때
trySend(NetworkResult.Empty)
} else {
trySend(safeApiCall(apiCall))
}
Timber.d("checkTokenApiCall awaitClose")
channel.closeTokenApi()
}
ApiProcess.LOGOUT -> {
trySend(NetworkResult.Error(ErrorHandler.ERROR_TOKEN_EXPIRED))
Timber.d("checkTokenApiCall awaitClose")
channel.closeTokenApi()
}
ApiProcess.NETWORK_UNSTABLE -> {
trySend(NetworkResult.Error(ErrorHandler.ERROR_NETWORK_CONNECTION))
Timber.d("checkTokenApiCall awaitClose")
channel.closeTokenApi()
}
ApiProcess.HOLD -> Timber.d("checkTokenApiCall ApiProcess.HOLD")
}
}
}
//3. 로그인 토큰 체크 api
launch {
Timber.d("checkTokenApiCall launch checkTokenExpired")
checkTokenExpired()
}
}
출처 : https://medium.com/harrythegreat/kotlin-코루틴의-callbackflow와-channelflow-f4e66c9fa116