mm Home

HADOOP 본문

개발/Big Data

HADOOP

jess_m 2019. 8. 12. 16:33

하둡이란?

분산처리를 위한 프레임워크 시스템. (2.x 기반 설명..)

 

주요 모듈

HDFS

MapReduce

YARN

 

HDFS (Hadoop Distributed FileSystem)

hadoop의 분산 파일시스템. 파일 시스템이라 하면, 데이터를 어디에 어떻게 저장할 것이고 어떻게 관리할 것인지에 대한 시스템. 

OS의 파일시스템 레이어와 다르다. 실제로 남기는 데이터는 OS의 파일시스템을 사용.

 

 

 

주요 특징.

  • 처리 가능한 데이터 사이즈 커짐 (Petabytes)
  • 이미 존재한 파일에 append는 가능하지만 수정이 불가능하다. (Write once, Read many times)
    • 파일 이동, 삭제, 복사 등의 기능 있음.
  • block
    • 파일시스템의 블록.
    • 블록 사이즈 : 128MB (default)
      • 실제 OS 파일시스템처럼 파일크기가 블록 사이즈에 미달하여도 블록 사이즈 단위만큼 할당되지 않는다.
        • 그저 파일 관리를 블록 사이즈만큼 한다는 뜻.
    • 일반적인 파일시스템과 다르게 큰 블록 사이즈.
      • 블록 검색이 많이 발생하지 않음. 
        • 블록 사이즈가 4kb 라면, 1G 파일 하나에 262144 블록 갯수를 가진다. 저 블록들을 다 검색하려면 당연히 비효율적. (블록 사이즈가 128Mb 라면 8개이므로 당연히 효율적.)
        • 그런데 어차피 OS 파일시스템에서 4kb이니 똑같은거 아닌가??
          • 아님. os의 파일시스템에서 인접한 연속 블록을 검색하므로(128mb만큼) 부하 적음.
      • 지속적인 TCP  연결 유지로 오버헤드 줄일 수 있음.
    • 반대로 너무 블록 사이즈를 크게만 잡는다면, 병렬 수행이 어려움.
  • namenode 와 datanode
    • namenode : 파일시스템에서 메타 데이터를 관리하는 노드.
      • 파일 시스템의 디렉토리 구조와 파일의 각 블록에 대한 정보를 가지고 있다. (해당 메타 데이터를 메모리에 들고 있어서 파일에 대한 수정이 일어날 경우 disk i/o 없이 가능)
      • 데이터 노드 모니터링. (heartbeat, block report)
      • 블록 관리 (블록을 어느 데이터 노드에 쌓을지, 잘못된 블록은 새로운 데이터노드에 쌓도록 함)
      • 클라이언트 request 확인 (클라이언트의 권한 확인이나 기존의 파일 여부와 같은 확인 절차)
      • 파일의 inode 가 잘못되거나 유실될 경우, 클러스터를 수천대로 구성되어있다 하더라도 무용지물이 되어버림. HA구성으로. (active - standby)
    • datanode : 실제 hdfs에서 관리하는 데이터를 저장하는 노드.
      • datanode는 주기적으로 block scanner(별도 스레드)가 블록을 스캔하여 손상을 감지한다. 그 결과를 네임노드에 블록 리포트를 전달하여 정상적인지 알려줌. 
        (만약 손상된 블록을 발견하면 네임노드에 전달하여 손상된 블록을 복제하도록 함.)
  • secondary namenode
    • 보조 노드. standby namenode 가 아니다.
    • hdfs는 기동시 마지막으로 저장되어있던 스냅샷 (fsImage)를 불러오고, 불러온 스냅샷에 변경 로그(Edit log)를 적용(merge) 하여 메모리에 올려둔다.
      • fsImage : 특정 시점의 스냅샷  (snapshot period option : dfs.namenode.checkpoint.check.period)
      • edits log : 마지막 스냅샷 이후의 변경 사항.  
        edits log는 중요한 자료이기 때문에 journal node라는 새로운 데몬을 띄워서 여러 서버에 복제 한다. (저널 노드 최소 3개)
        active namenode 만 edits log에 대한 write 권한이 있다. (read는 둘다 있음)
    • 여기서 마지막 fsImage 로부터 edit logs가 많다면 기동시 오래 걸림(edits log 의 데이터가 너무 많아서 메모리가 너무 커진다면 기동시 문제가 생길수 있음 (용량 제한도 없음)).. 그걸 보조해주는게 secondary namnode.
    • hadoop 2버전에서는 잘 사용안한다. (주로 journal node 를 통해 HA 구성을 하기에.)
  • write
    •  
    • NameNode에 어디에 create 요청을 하면, 어느 노드(list) 에 쓸지 알려준다. 
    • 첫번째 DataNode에 파일을 전부다 쓰고, 다른 DataNode에 복제한다. (해당 노드에 저장하지 않을 block도 일단 local에 쓰고 복제한다.)
    • 파일이 여러개의 블록으로 이루어질 경우. 블록 사이즈만큼 다 쓰게 되면 클라이언트가 namenode에 완료되었음을 알리고 close 처리를 한다.
  • read
    • 네임노드에 어느 데이터 노드를 바라봐야하는지 묻고(네임 노드는 블록 리스트를 반환한다), client가 직접 데이터노드에 요청을 보내게 된다. 
    • sequentially하게 데이터를 가져온다.
  • replication
    • 설정한 수만큼 블록 단위를 복제한다.
    • default : 3
  • Rack Awareness
    • 랙 인식 기능. 실제 노드간 물리 랙 위치를 인식하여 가까운 노드에게 위임할 수 있도록 하는 설정.
    • read/write 시 근접 노드를 선택하도록 해준다.
    • mapreduce 에서도 데이터 재구성시 인접한 랙에 있는 데이터를 활용하여 네트워크 비용 절감.
    • 블록 복제시에도 랙 인식 기능을 통해 다른 랙에 복제. (하나의 랙에 장애발생시, 복제본이 모두 하나의 랙에 존재할 경우 장애.)
    • 설정해야 한다.

 

여기서 파일의 수가 너무 많을때

  • 블록 메타 데이터는 name node의 메모리에 올라가있다. 너무 많은 파일은 name node의 힙 메모리를 모자라게 하여 중단되는 상황이 발생할 수도 있음.
    • 네임노드는 파일 크기가 작건 크건 상관안한다. 메타 데이터를 관리하므로.
    • 너무 많은 메타 데이터는 GC에도 영향을 받아 성능 저하가 생길 것이고, 또 그 상태로 많은 요청이 오게되면 병목을 발생시킬 수 있다.
  • block report의 부하 증가
    • block report는 모든 블록을 지속적으로 추적하여 검사하는데, 블록이 많을 수록 많은 요청이 생기게 됨.
  • MapReduce 성능 저하
    • 작은 크기의 파일수는 많은 블록을 읽어들여야 하는 문제가 있다.
    • 당연히 각 블록은 디스크에 연속적으로 위치해있지 않다. 랜덤 access가 커지므로 성능이 저하됨. 
    • 데이터 블록 하나당 map 작업이 할당된다. 10000개의 블록이 있다면 10000개의 맵 작업이 수행되는 것. JVM에 올리고 내리는 작업까지 오버헤드가 있음.
  • hadoop 3 버전에서 해결을 위해 작업중이라고 함.

 

Cloudera의 "Sizing NameNode Heap Memory" 문서를 참고하면, 백만 개 블록마다 1GB의 heap을 설정 권장. (힙 메모리를 그냥 무작정 늘려 놓으면 당연히 GC에서 성능 저하)

 

 

 

 

 

mapreduce

분산 환경에서 병렬 데이터 처리 기법으로 map과 reduce 함수를 통해, 데이터를 필터링, 가공, 정렬하도록 하는 프로그래밍 기법. key-value 형식의 데이터 처리.

큰 문제를 다루기 쉬운 조각으로 분해하고 각 조각을 독립적으로 조작 한 다음 모든 조각을 다시 조립하는 전략.

Mapper 와 Reducer 를 직접 구현하여, hadoop 클러스터에 Job을 제출하는 방식으로 동작. (Mapper와 Reducer는 map, reduce 작업을 수행하는 객체이다)

 

 

보통 아래의 순서로 동작한다.

 

driver 라고 불리우는 Main 클래스에 mapreduce 구성 설정을 하게 되고, 아래의 순서에 따라 동작.

  1. input read
    1. Input Format에 따라 Map 작업을 준비하게 된다
    2. 앞에서 key-value 데이터를 처리한다고 했는데, 일반 텍스트 파일에는 key가 없음.
    3. 그래서 TextInputFormat (default) 의 경우. byte offset (by line) 과 text가 (key, value) 로 map 에 전달된다.
    4. 직접 구현하여 사용해도 되고, 개발되어 있는 다른 구현체를 사용해도 됨. 
  2. map
    1. 각 작업자 노드(worker) 는 local data 에 map 함수를 적용하여 output을 임시 저장소에 write 한다.
    2. 주로 (key, value) 데이터를 필터링하거나 다른 값으로 변환하는 작업을 수행.
    3. output에 대한 (key, value) 를 정의.
  3. shuffle 
    1. worker 노드들은 key(map의 output key임) 를 통해 데이터를 재구성한다. 
    2. 크게 보면, map 태스크 이후 output에 대하여 데이터를 재구성하여 reducer에 전달해주는 작업.
    3. 같은 key를 가진 데이터는 동일한 작업자 노드에서 동작하기 위함. 가령 key 별로 정렬하고 싶은 경우.
    4. Partitioner는 셔플에 포함된 작업이라고 보면 편함. (실제로는 파티셔너 객체가 따로 존재함.)
      1. HashPartitioner (default) : map의 output Key의 해시코드를 확인하여 담당할 리듀서를 정한다.
        1. 예를들어. key: Apple, key: Grape 가 있다면.. Apple 은 해시코드가 1111, Grape는 2222, Reducer갯수가 3개 일 경우, Reducer 갯수로 % 연산한다. 1111 % 3 하여 나온 값으로 Reducer 0, Reducer 1, Reducer 2에 각각 할당
        2. custom partitioner 가능.
    5. (the Map output to the Reduce processors. 밑에서 다시 설명)
  4. reduce : map과 shuffle을 거쳐 key, value로 재구성된 데이터를 reduce 함수 처리.
    1. key로 grouping 하여 집계(aggregation)된 결과를 input으로 받는다.
      • Map 클래스와 거의 비슷하지만, input value 가 Iterable 객체.
    2. input 으로 들어올때 key별로 sort가 되어져서 들어온다.
      1. 예를들어 아래와 같이 reduce에 들어온다면... 
      2. input : (key1, value1), (key1, value2), (key2, value1)
      3. output : (key1, [value1, value2]), (key2, [value1])
  5. output write
    1. Output Format 에 따라 key -  value 구조가 출력파일에 기록된다.
      • Input Format 과 마찬가지..
      • ex. TextOutputFormat (default)의 경우 line마다 "key \t value" 형태로 기록한다.

 

 

실제 데이터의 흐름 (wordCount)

  • Input 데이터
    Apple Grape Apple
    Grape Apple Mango
    Apple Pineapple Apple Grape
  • Input read  (Mapper 3개 라는 가정 - 아래는 map의 input)
    Mapper A : (1, Apple Grape Apple)
    Mapper B : (2, Grape Apple Mango)
    Mapper C : (3, Apple Pineapple Apple Grape)
  • map 수행. (map의 output)
    Mapper A : (Apple, 1), (Grape, 1), (Apple, 1)
    Mapper B : (Grape, 1), (Apple, 1), (Mango, 1)
    Mapper C : (Apple, 1), (Pineapple, 1), (Apple, 1), (Grape, 1)
  • shuffle & partitioning
    Reducer A : (Apple 1), (Apple 1), (Apple 1)
    Reducer B : (Grape, 1), (Grape, 1)
    Reducer C : (Pineapple, 1), (Mango, 1)
  • sort
    Reducer A : (Apple 1), (Apple 1), (Apple 1)
    Reducer B : (Grape, 1), (Grape, 1)
    Reducer C : (Mango, 1), (Pineapple, 1)
  • reduce 입력
    Reducer A : (Apple, [1,1,1])
    Reducer B : (Grape, [1,1])
    Reducer C : (Mango, [1]), (Pineapple, [1])
  • reduce 함수
    Reducer A : (Apple, 3)
    Reducer B : (Grape, 2)
    Reducer C : (Mango, 1), (Pineapple, 1)

 

 

여기에 추가로.

  • Combiner
    • Reducer로 전달되기 전에 Mapper 출력 레코드를 동일한 키로 요약하는 Mini-Reducer 라고도 한다.
    • Mapper는 많은 양의 중간 데이터를 생성하고이 중간 데이터는 차후 처리를 위해 Reducer에 전달되어 네트워크 혼잡을 초래할 수 있다.
    • 그래서 Combiner 를 통해 데이터의 양을 줄일 수 있다. (Combiner는 optional임.)
    • combiner 를 통해 map output 을 줄여서 disk, network i/o를 줄일 수 있다. 
      • 종종 네트워크 통신비용이 컴퓨팅 비용을 넘어서는 경우가 있다..
    • 위의 예시에서 보면 
      • 매퍼가 리듀서에게 전달해주기 이전에 연산을 value를 결합하여 전달해주는 것.
        아래의 매퍼 출력을
        Mapper A : (Apple, 1), (Grape, 1), (Apple, 1)
        Mapper B : (Grape, 1), (Apple, 1), (Mango, 1)
        Mapper C : (Apple, 1), (Pineapple, 1), (Apple, 1), (Grape, 1)
      • 아래와 같이 결합된 상태로 리듀서에 전달해주는 것.
        Mapper A : (Apple, 2), (Grape, 1)
        Mapper B : (Grape, 1), (Apple, 1), (Mango, 1)
        Mapper C : (Apple, 2), (Pineapple, 1), (Grape, 1)

 

  • 생성되는 Mapper의 갯수는 input 에 따라.
    • 특수한 input format이 아닌 경우에는 block 수가 생성할 Mapper 갯수가 된다. 
    • ex. input으로 들어올 hdfs 블록 200개면 생성할 Mapper 갯수가 200개.
    • 생성할 mapper 갯수가 200개라고 동시에 200개가 생성되어서 실행되는건 아니고, parallel 하게 진행되는건 yarn 설정에 따라)
  • 생성되는 Reducer의 갯수는 설정에 따라.
    • 아무 설정을 하지 않는다면 1. (mapreduce.job.reduces)

 

 

 

최근에.

  • 구글은 데이터 분석을 위해 더 이상 mapreduce 를 사용하지 않는다고 함. 관련 article
    • 빠르게 처리가 되지 않음.
    • 배치와 스트리밍의 간극.
    • 항시 클러스터 관리 (운영 및 배포)
    • Mahout 을 사용한다고 함. (기타 등등..)
      • Hadoop 위에 구현되고 MapReduce 패러다임을 사용하는 Apache Software Foundation의 프로젝트. 
        클러스터링, 협업 필터링 및 분류 분야에 초점을 맞춘 확장 가능하고 분산 된 기계 학습 알고리즘의 구현을 작성하는데도 사용

'개발 > Big Data' 카테고리의 다른 글

Flink - Watermarks  (0) 2019.12.11
Flink - Event Time  (0) 2019.12.09
Comments