Notice
Recent Posts
Recent Comments
Link
일 | 월 | 화 | 수 | 목 | 금 | 토 |
---|---|---|---|---|---|---|
1 | 2 | 3 | 4 | 5 | 6 | 7 |
8 | 9 | 10 | 11 | 12 | 13 | 14 |
15 | 16 | 17 | 18 | 19 | 20 | 21 |
22 | 23 | 24 | 25 | 26 | 27 | 28 |
29 | 30 | 31 |
Tags
- processing time
- 불공변
- Generic
- Perfect Watermarks
- ingestion time
- event time
- flink watermark
- covariant
- watermark
- Heuristic Watermark
- watermarks
- coroutines
- lambda
- MapReduce
- Perfect Watermark
- 공변
- kotlin
- flink
- HDFS
- apache flink
- java
- Hadoop
- flink watermarks
- 가변성
- Coroutine
- contravariant
- Stream
- Generics
- Heuristic Watermarks
Archives
- Today
- Total
mm Home
Kotlin Coroutines 본문
글에 앞서.. 2019년에 공부하며 작성한 글이다보니 당시 버전과 2021년 버전은 꽤나 다른것 같습니다
개념은 비슷하겠지만.. 참고 부탁드립니다
코루틴이란?
- 협력형 멀티태스킹을 위한 프로그래밍 요소.
- 새로운 동시성 프로그래밍 방법.
- 코루틴을 경량화 스레드라고 자주 표현한다.
- 쉽게 보면, 중단가능한 Function이다.
이해가 잘 안되는데… 왜 나온거지?
- 코루틴 개념과 용어는 이미 1958년에 나옴.
- 서브루틴을 사용하지 않으면 공통된 기능을 묶지도 못하고, 모든 코드를 한번에 메모리에 올려야 하기 때문에 개발시에도, 런타임시에도 비효율적. (중복되는 루틴에 대해서 메모리 비효율적 - 생성해야하기 때문에)
- 서브루틴은 별도의 메모리에 올려놓고, 서브루틴이 호출될때마다 저장된 메모리로 이동했다가, return 될 때 호출했던 호출자(caller)의 메모리 위치로 돌아간다. 호출할 때마다 매번 같은 위치로 이동하기 때문에 효율적이다.
- 코루틴도 서브루틴과 동일하지만 진입점과 반환점이 하나밖에 없어서 호출자에 종속적인 서브루틴과 달리, 다수의 루틴이 종속성없이 서로 같이 동작한다는 것이다.
쉽게 설명하면…
- 코루틴 Function에는 suspend 키워드를 붙이게 되는데, 현재 동작중인 스레드를 차단하지 않고 코루틴 실행을 일시 중단함을 의미한다.
- 결국 코루틴은.. 함수간에 일시 중단가능한 함수라는 것이다.
- 일시 중단되었을 때 해당 스레드로 다른 코루틴을 실행하겠다!
- routines 간 co-working 한다고 보면 된다
- 경량화 스레드라고 불리는 이유는 스레드의 라이프 사이클과 비슷하고, 스레드에 의해 코루틴이 실행되지만 스레드에 종속적이지 않기 때문에 스레드를 신경쓸 필요 없이 보여서 코루틴만 신경쓰면 되므로 스레드처럼 보이는게 있어서 인것 같다.
- 스레드는 native 스레드 (OS단)에 직접 매핑되어 관리하는 반면, 코루틴은 실제 사용하는 유저가 관리하기 때문에 오버헤드가 적다. (컨텍스트 스위칭)
- 코루틴은 스레드의 생성보다 훨씬 빠르고 저렴한 비용이 드는 것이 특징
코루틴은 뭐가 좋은가?
- 전통적으로 context switching은 OS 의 스케쥴링 정책에 맡겨져 있다.
- context switching 이 발생하면 Cache를 초기화해야 한다.
- memory mapping 된 데이터도 다시 초기화해야 한다.
- 이 일련의 동작이 아주 무겁다.
// Original synchronous code (익숙한 코드)
fun postItem1(item: Item) {
val token = requestToken()
val post = createPost(token, item)
processPost(post)
}
// Callback 을 이용한 비동기 처리
fun postItem2(item: Item) {
requestTokenAsync{ token ->
createPostAsync(token, item) { post ->
processPost(post)
}
}
}
// CompletableFuture
// callback 보다 훨씬 깔끔한 코드.
// 하지만 예외처리나 복잡한 combinator 가 생기면 처리하기가 어렵고 복잡해짐
fun postItem3(item: Item) {
requestTokenAsync2()
.thenCompose{ token -> createPostAsync2(token, item)}
.thenAccept{ post -> processPost(post)}
}
// Reactive 도 비슷
fun postItem31(item: Item) {
requestTokenAsyncRx()
.map{ token -> createPost(token, item) }
.map{ post -> processPost(post) }
.subscribe()
}
// Coroutine
// 코루틴은 regular-formal code. 단지 suspend 키워드만 추가.
// 코루틴을 생성해야하는 부분이 있지만 그래도 평범한 코드로 개발. (비동기식 코드를 동기식 코드처럼 개발)
// 예외처리 & if & loop etc… 일반 코드처럼 가능. 코틀린에서 지원하는 고차함수도 어떤것이든 다 적용 가능.
// suspend 함수는 코루틴이나 suspend 함수 안에서만 호출 가능.
suspend fun postItem4(item: Item) {
val token = requestTokenLaunch()
val post = createPostForCoroutine(token, item)
processPostForCoroutine(post)
}
OK. 어떻게 사용하는건지??
- Coroutine Context를 정의해주어야 한다.
- suspend로 Function이 코루틴임을 명시해야한다.
- 일반 Function에서는 coroutine Function을 사용하지 못한다.
- 일반 Function에서 coroutine Function을 사용하려면 Coroutine Context를 정의해주면 된다.
suspend fun createPost(token: Token) {
//new Thread() 로 생각하면 된다.
launch {
delay(1000)
//TODO something
}
}
// Context정의가 없어서 상위의 Coroutine Context를 사용한다.
@Test
fun doSomething() {
//Compile error
launch {
println("start")
delay(1000)
println("end")
}
}
@Test
fun doSomething() {
runBlocking {
launch {
println("start")
delay(1000)
println("end")
}
}
}
// Context정의가 있어서 실행 가능하다.
@Test
fun doSomething() {
GlobalScope.launch {
println("start")
delay(1000)
println("end")
}
}
// GlobalScope로 default Context를 이용하므로 실행 가능하다.
// 그런데 프로세스가 바로 종료된다. 프로세스는 Coroutines 을 기다리지를 못한다.
// Thread.sleep(3000) 처럼 강제로 기다리게 하면 되긴한다… 당연히 이렇게 쓰면 안됨 ㅎㅎ
Launch vs Async
- Launch 는 Job을 반환한다.
- Async 는 Deffered 를 반환한다.
- Deffered는 Job을 상속한 클래스이고, 타입 파라미터를 통해 코루틴의 Return Value를 반환받을 수 있다.
- Java 의 callable vs runnable 로 생각하면 될듯.
어떻게 중단했다가 다시 재개할까??
OS에서 스케쥴링을 보면 스케쥴링 알고리즘에 의해 우선순위에 따라 결정된 스레드가 실행된다.
실행될 때, PCB(Process Control Block)에 저장된 프로세스 정보로 PC(Program Counter)가 메모리 주소로 이동하여 다음 명령을 실행하게 된다.
이걸 언어적으로 JVM 내에서 처리하도록 한 것이다.
기본 개념 : '상태' 를 가지고 있으면 다시 시작할 수 있다.
- 누가 호출했는지, (Caller)
- 어디까지 실행했는지, (Program Counter)
- 어떤 값들을 가지고 있는지 (Stack)
suspend fun plusOne(initial: Int) : Int {
val one = 1
var result = initial
result += one
return result
}
위의 함수가 컴파일되면 아래와 같다 (Labeling!)
// state.label을 이용해 어디까지 실행되었는지 구분 가능하다
fun plusOne(initial: Int, countinuation: Countination) : Int {
val state = continuation as empty ?: CoroutineImpl {…}
switch(state.label) {
case 0:
state.label = 1
val one = 1
sm.one = one
…
case 1:
val one = sm.one
var result = initial
case 2:
result += one
case 3:
return result
}
}
- CPS (Continuation passing style) 변환을 활용해 코드를 생성해낸다.
현재 시점까지 실행한 결과를 넘겨서 처리하게 만드는 소스코드 변환 기술.
자기가 처리할거 처리하고, 던지고 던지고 던지고……가 CPS.
코루틴 빌더
그렇다면 코루틴은 어떻게 만들까??
- 코루틴을 사용하려면 suspend만 붙이면 되지만, 코루틴을 만들기 위해서는 코루틴 빌더를 통해서 만들어야 한다.
- 코루틴 빌더란 코루틴을 생성하는 것이다.
- 다만 코루틴이 실행되기 위해서는 여러 요소들을 묶는 context(Job, Dispatcher, Scope) 안에서만 실행이 가능하다.
runBlocking {
launch {
println("a")
delay(1000L)
println("World!")
}
launch {
println("b")
delay(1000L)
println("World!")
}
}
runBlocking {
launch {
println("1")
yield()
println("3")
yield()
println("5")
}
launch {
println("2")
yield()
println("4")
yield()
println("6")
}
println("end")
}
Coroutine context
- coroutine의 context는 여러 요소의 셋트라고 보면 된다.
- Job
- scope
- distpatcher
Coroutine Dispatcher
- coroutine을 처리하기 위해 스레드에 할당하는 처리자이다.
- 코루틴이 실행될 스레드를 결정하는 녀석이라고 보면 된다.
- 가령 코루틴을 특정 스레드(풀)에서만 실행하게 하고 싶다던지, 아니면 제한이 없이 하고 싶다던지의 정책을 정할 수 있다.
Dispatcher Value | 의미 | 비고 |
미지정(특별히 명시하지 않았을때) | 부모(상위)의 Context를 상속받아 그대로 사용한다. | Default |
Dispatchers.Unconfine | 백그라운드 스레드 풀을 공유한다. | 루틴에 들어가는 스레드는 특정 스레드에 제한하지 않는다. coroutine을 처음 실행하는 스레드와 중단 이후 재진입하는 스레드가 다를 수 있다는 것. |
Dispatchers.Default | Default Thread Pool을 이용한다. | |
newSingleThreadContext or newFixedThreadPoolContext | 코루틴 실행을 위한 스레드 풀을 별도로 만들어 사용한다 |
Coroutine Scope
- 모든 코루틴 빌더는 코루틴 스코프를 상속한다. 그리고 코루틴 스코프에는 코루틴 컨텍스트를 가지고 있다.
- 코루틴 빌더는 코루틴 스코프를 상속하기 때문에, 코루틴 빌더는 반드시 코루틴 스코프를 필요로 한다!
- 여기서 코루틴 스코프는 상속 관계에 있는 코루틴 스코프의 컨텍스트에 자동으로 컨텍스트 요소들과 cancellation을 전파하게 된다.
즉, '코루틴 Scope 내에서 컨텍스트들간의 컨텍스트 요소와 cancellation을 전파 전략을 담당하게 되는 녀석' 이 라고 이해하면 된다.
그래서 상위 스코프가 지정되어있으면 자동으로 스코프를 전파하므로 하위에서 스코프 지정을 안해도 되는 것이다. (automatically propagate cancellation)
만약 runBlocking 스코프에서 상위 스코프의 Job이 취소가 되면, 하위 스코프의 Job도 취소가 되게 된다. (automatically propagate context elements)
참고로 scope는 어떤 데이터도 들고 있지 않다. 그냥 context만 가지고 있다. 자기 scope 내의 컨텍스트만 컨트롤이 된다는 것.
coroutine job
- coroutine의 상태 정보를 들고 있다.
- active 상태인지
- completed 상태인지
- callcelled 상태인지
- 이 job을 이용해 아직 끝나지 않은 job을 취소시키거나, join 하거나, await 할 수 있다.
그럼 실제 사례
들어가기 앞서 간단한 개념을 잡고 들어가면,
- runBlocking 은 coroutine Context 구현체 중 하나로, 외부의 스레드와 차단하여 coroutine을 해제(?)한다고 보면 될것 같다. runBlocking 내부 Scope은 외부의 코루틴과 co-working 하지 못한다. (중단하지 못한다)
Scope내에 진입한 스레드 1개만 이용 가능. 물론 내부에 자식 Context를 새로 만들면 이용 가능.- 코루틴 빌더를 만나게 되면 바로 실행하는게 아니라, Queue에 넣고 Dispatcher에 의해 실행되어야 한다. 만약 사용가능한 스레드가 모자르다면 당연히 실행이 되지 못한다.
- coroutine은 Default로 common Pool을 가지고 있다. (fork/join pool) GlobalScope로 코루틴을 지정하면 Context를 default로 정해서 실행하게 된다. (GlobalScope로 코루틴 빌더를 생성하면 Context 지정하지 않아도 된다. Default를 사용하기 때문)
- coroutine의 Dispatcher 는 결국 Event Loop.
runBlocking {
launch {
println("1")
println("3")
println("5")
}
launch {
println("2")
println("4")
println("6")
}
println("end")
}
// runblocking에 걸린 스레드 하나가 parent 가 되어서 (Scope) 해당 스레드가 순서대로 실행. (end,1,3,5,2,4,6)
// thread 는 1개.
runBlocking {
launch {
println("1")
yield()
println("3")
yield()
println("5")
}
launch {
println("2")
yield()
println("4")
yield()
println("6")
}
println("end")
}
// runBlocking 은 Coroutine Context이다. 외부의 스레드로부터 차단. (서로 간섭을 할 수 없다.)
// runblocking 된 스레드가 parent context(scope)가 되어 launch 한 2개의 코루틴을 commonPool에서 실행시킨다.
// thread 는 1개.
// 첫번째 코루틴 생성, (아직 실행 안됨- 스레드가 1개밖에 없기 때문이다. main 스레드가 양보하지 않았기 때문)
// 두번째 코루틴 생성,
// print end
// 첫번째 코루틴이 실행되고 양보를 하면 dispatcher가 다음걸 어떤걸 실행할지 결정한다.
// 두번째 코루틴이 실행되고 반복한다.
@Test
fun test1() {
runBlocking {
val list = mutableListOf<Deferred<Unit>>()
withContext(newFixedThreadPoolContext(4, "test")) {
for(i: Int in 0 until 10) {
val a = async {
printTest()
}
list.add(a)
}
}
awaitAll(*list.toTypedArray())
}
}
suspend fun printTest() {
println("before prcoess")
Thread.sleep(2000)
println("after process")
}
// 코루틴이 실행된 Context는 newFixedThreadPool 4개.
// 그러나 내부에서 Blocking
// 4개의 스레드가 Blocking 되고, 순서대로 풀리면서 다음 작업을 하게 된다. 총 6초 소요. -> Blocking 은 코루틴 자체의 장점이 사라짐..
// nonblocking API를 사용하면(Delay) 2초 소요.
Coroutine 단점?
- non-blocking 에서만 사용 의미가 있다.
- blocking이면 일시중지(양보)를 못하기 때문에 하나의 스레드가 하나의 Job을 계속 잡고 있게 되므로, 그냥 Thread Pool을 이용하는 정도밖에 되지 않는다.
- 사용하는 API가 non-blocking이 되는지 반드시 확인해야 한다.
- 개인적으로 blocking인 경우에는 pub/sub을 나누는 reactive 구조가 더 나아보인다.
- 일단 사용 자체는 쉬우나, 복잡한 상황이 생기면 깊이 있는 공부가 필요하다.
'개발 > Kotlin' 카테고리의 다른 글
Kotlin - Generics (1) | 2019.12.04 |
---|
Comments