mm Home

Reactive Programming 본문

개발/기타

Reactive Programming

사용자 jess_m 2017. 11. 10. 11:19

최근 자바 9에서는 Reactive Streams API 가 추가되었다.

비동기 논블록킹의 개발방식이 떠오르면서 자바에서도 도입한것 같다. Reactive가 왜 나오게 되었는지, 어떤 개발 방식인지 살펴 보자. 

간단하게 자바 9로 예제도.


 


 

기존의 절차지향적인 개발방식에서 Reactive 프로그래밍 패러다임이 떠오름.

  • 왜 Reactive ?
    • 기존의 절차지향적이고 쓰레드풀을 이용한 처리의 문제점.
      • 블록킹이 걸려있는 상태에서 대기하고 있는 쓰레드의 성능 문제 (특히 MSA 같은 서비스에서 API 콜 수행시 부하도 없이 쓰레드를 붙잡고 있는 상황 연출)
      • 쓰레드의 변경마다 발생하는 context-switch 비용
      • 한정적인 자원의 쓰레드 풀. 
      • 쓰레드간 공유하는 자원으로 인한 side-effect 가능성.
    • Event-driven 형식의 개발로 Responsive 하게 개발하자!
      • 빠르고 일관된 응답 시간 (막힘없는)
      • 발생한 에러에 대한 적절한 응답
      • 발생한 에러의 적절한 회복
    • 기존의 Event-Driven 형식의 callback 개발은.. callback hell 을 야기.
    • 기존의 절차지향적인 Control의 흐름에서 Data의 흐름을 보기 시작.   → 비동기로 흐르는 데이터를 함수형 개발방식으로 컨트롤!!
      • 흐르는 데이터를 함수형의 stateless 하고 side-effect-free 한 개발!
    • 실제 비즈니스 로직의 isolation

 


  • java 9는?
    • Publisher : 데이터 제공자. 구독한 구독자들에게 구독 정보를 토대로 데이터를 제공한다.
    • Subscriber : 데이터 소모자. 제공자로부터 데이터를 받아 소모한다.
    • Subscription : 구독 정보. Subscriber는 Publisher를 구독하여 데이터(n) 요청할 수 있다. 

 

    • Publisher 
      • subscribe(Subscriber<? super T> subscriber) : 구독자가 구독을 시작함. Publisher가 구독자를 관리하기 시작.
    • Subscriber
      • onSubscribe(Subscription subscription) : 지정된 구독에 대한 구독 시작 (구독 상태에 놓여있다)
      • onNext(T item) : 구독의 request Iterm을 실행하는 메소드
      • onError(Throwable throwable) :  퍼블리셔나 구독에서 에러가 발생할때 호출되는 메소드
      • onComplete() : Subscriber가 더 이상의 구독을 하지 못할 때 호출되는 메소드
    • Subscription
      • request(long n) : 이 구독에 대한 n 개의 데이터를 추가
      • cancel() : request 취소

 


Publisher와 Subscriber, Subscription의 관계를 그림으로 보자.

http://javasampleapproach.com/java/java-9/java-9-flow-api-reactive-streams


Publisher는 구독자(Subscriber)를 구독하게 한다.(subscribe()) 구독자는 구독 정보(Subscription)을 토대로 구독상태에 놓여지게 된다. (onSubscribe()).  구독 정보를 가지고 Publisher에게 데이터를 요청(request) 하기도 하고 요청한 데이터를 취소(cancel) 하기도 한다. 그리고 전달받은 데이터를 소모시킨다(onNext()) . Subscription이나 Publisher에게 에러가 발생했을때는 onError()메소드가 호출될 것이고, 더 이상 요청할 수 있는 데이터가 없다면 onComplete()가 호출된다. 




Reactive가 데이터의 흐름이라고 한 것처럼 데이터의 흐름을 그림으로 보면, 일련의 데이터 Stream은 operator를 거쳐 정제될 것이다. Operator를 통하여 원하는 방식대로 데이터를 가공하는 것이다. 


https://raw.githubusercontent.com/reactor/reactor-core/v3.1.0.RC1/src/docs/marble/flux.png

 

 



Java 9에서 추가된 Reactive API로 간단한 예제를 만들어보자.

Publisher는 구현이 복잡하므로.. JDK내에 기본 제공해주는 Publisher 구현체를 사용했다

 

Reactive 예제

  • JDK내 제공해주는 publisher 구현체를 사용.
  • SubmissionPublisher : submit 을 통해 데이터를 받으며, close 할 때 까지 구독한 구독자에게 publish 함.

 

Subscriber

public class UserSubscriber<T> implements Subscriber {
 
    private Subscription subscription;
 
    private static final Logger log = LoggerFactory.getLogger(UserSubscriber.class);
 
    @Override
    public void onSubscribe(Subscription subscription) {
        this.subscription = subscription;
        subscription.request(1);
    }
 
    @Override
    public void onNext(Object item) {
 
        log.info("Got : " + item);
 
        //TODO : Something
 
        subscription.request(1);
    }
 
    @Override
    public void onError(Throwable throwable) {
        log.error(throwable.getMessage());
    }
 
    @Override
    public void onComplete() {
        log.info("onComplete");
    }
}

 

 

Main

public class UserPubSub {
 
    private static final Logger log = LoggerFactory.getLogger(UserPubSub.class);
 
 
    public static void main(String[] args) throws InterruptedException {
 
        log.info("start");
 
        //publisher의 스레드풀 설정
        final ExecutorService executor = Executors.newFixedThreadPool(10);
 
        //Data stream
        List<User> collect = Stream.iterate(1, n -> n+1)
                .map(i -> new User("test" + i, i)).limit(50)
                .collect(Collectors.toList());
 
         
        SubmissionPublisher<User> publisher = new SubmissionPublisher(executor, 100);
         
        //구독 시작
        publisher.subscribe(new UserSubscriber<>());
        //publisher에 Data submit
        collect.forEach(publisher::submit);
 
        //프로세스가 먼저 죽으면 안되니 잠깐 기다림
        ForkJoinPool.commonPool().awaitTermination(30, TimeUnit.SECONDS);
        publisher.close();
 
        log.info("end");
    }
}

 

start와 end가 바로 찍히고 나서 Pub/Sub이 발생하는걸 볼 수 있다.  (비동기이기 때문에 당연히..)

 



'개발 > 기타' 카테고리의 다른 글

Netty  (0) 2017.11.17
Reactive Programming  (0) 2017.11.10
HTTP 동작 과정  (0) 2017.10.26
OSI 7  (0) 2017.09.29
Hystrix  (0) 2017.08.17
0 Comments
댓글쓰기 폼