coroutine은 Thread-Safety와 DownStream으로의 문제 전달을 막기 위해 flow 블록 내에서는 Context를 하나만 사용 할 수 있다.

<aside> 💡 UpStream / DownStream

UpStream : flowOn 이전의 생산자, 중간 연산자

DownStream : flowOn 이후 연산자

스크린샷 2024-01-17 오전 9.36.24.png

</aside>

이런 문제를 위해 rx의 subscribeOn과 유사하게 flowOn을 통해 context를 바꾸어줄수있지만 하나의 블록안에서 동적으로 바꾸거나 callback 함수에서는 사용 할 수가 없다.

이러한 문제를 위해 channelFlow와 callbackFlow가 있다.

이 둘은 기능적으로 큰 차이가 없다.

💙 예시

//1초 안에 api가 중복으로 호출된다면 제일 마지막 요청만 서버에 전달한다.

private var deferred: Deferred<NetworkResult<ResRecordHistoryData>>? = null
    override suspend fun sendToServer(params: Int, delay: Long) = channelFlow {
        Timber.e("===1. 서버 요청=== $registerWalkingDeferred")
        trySend(NetworkResult.Loading)

        CoroutineScope(ioDispatcher).launch {
            Timber.e("===2. 이전 요청 취소===")
            deferred?.cancelAndJoin()

            deferred = async {
                Timber.e("===3. 1초 지연 $delay===")
                delay(delay)
                Timber.e("===4. 실제 서버 호출===")
                return@async safeApiCall { apipiService.sendToServer(params) }
            }

            deferred?.await()?.let {
                Timber.d("===5. 데이터 전송 it = $it")
                trySend(it)
            }
        }
				awaitClose()
    }

//api 호출전 다른 api를 체크하고 그 결과값을 넘겨 받고 싶을 때

enum class ApiProcess {
    HOLD,
    NEW_TOKEN,
    VALID_TOKEN,
    LOGOUT,
    NETWORK_UNSTABLE
}

/**
 * 로그인 토큰 관련 데이터
 */
private val _checkTokenFlow = MutableStateFlow(ApiProcess.HOLD)
val checkTokenFlow = _checkTokenFlow.asStateFlow()

/**
 * 로그인 코든 관련 flow 구독을 해제 할 때 사용
 */
suspend fun <E> SendChannel<E>.closeTokenApi() {
    _checkTokenFlow.emit(ApiProcess.HOLD)
    close()
}

fun <T> checkTokenApiCall(
    dataStoreRepository: DataStoreRepository,
    apiCall: suspend () -> Response<T>
) = channelFlow {
    Timber.d("checkTokenApiCall trySend Loading")
    //1. 로딩
    trySend(NetworkResult.Loading)

    //2. 로그인 토큰 결과값
    launch {
        checkTokenFlow.collect {
            Timber.d("checkTokenApiCall it = $it")
            when (it) {
                ApiProcess.NEW_TOKEN,
                ApiProcess.VALID_TOKEN -> {
                    val isBlockApi = dataStoreRepository.isBlockApi()
                    Timber.d("checkTokenApiCall okhttp isBlockApi = $isBlockApi")
                    if (isBlockApi) {   //다른 디바이스에서 로그인이 되어 api 호출을 할 필요가 없을때
                        trySend(NetworkResult.Empty)
                    } else {
                        trySend(safeApiCall(apiCall))
                    }
                    Timber.d("checkTokenApiCall awaitClose")
                    channel.closeTokenApi()
                }
                ApiProcess.LOGOUT -> {
                    trySend(NetworkResult.Error(ErrorHandler.ERROR_TOKEN_EXPIRED))
                    Timber.d("checkTokenApiCall awaitClose")
                    channel.closeTokenApi()
                }
                ApiProcess.NETWORK_UNSTABLE -> {
                    trySend(NetworkResult.Error(ErrorHandler.ERROR_NETWORK_CONNECTION))
                    Timber.d("checkTokenApiCall awaitClose")
                    channel.closeTokenApi()
                }
                ApiProcess.HOLD -> Timber.d("checkTokenApiCall ApiProcess.HOLD")
            }
        }
    }

    //3. 로그인 토큰 체크 api
    launch {
        Timber.d("checkTokenApiCall launch checkTokenExpired")
        checkTokenExpired()
    }
}

출처 : https://medium.com/harrythegreat/kotlin-코루틴의-callbackflow와-channelflow-f4e66c9fa116