본문 바로가기
Data Science/분산처리기술

Hadoop 개념 - MapReduce의 흐름에 따른 정의

by En.Lee 2014. 11. 23.

MapReduce 의 데이터 흐름

HadoopTutorialF0404

Figure 4.4: High-level MapReduce의 pipeline

HDFS 클러스터에 파일이 적재됨으로써 MapReduce 입력이 시작된다. 이들 파일은 전체 node에 균등하게 배분되는데 이에 대해 MapReduce 프로그램이 수행되면서 node에서는 mapping task가 시작된다. 이때 각각의 mapping task는 동등한 것으로서 이들 서로를 구별할 수 없으며 각 mapper는 그 어떤 입력파일도 처리할 수 있다. 각각의 mapper는 각 기기에 인접해 존재하는 파일들을 적재한 후 그 컴퓨터가 처리하게 한다.

mapping 단계가 끝나면 중간산출물로서의 intermediate (key, value) pair가 각 컴퓨터 사이에서 교환되고 같은 key를 가지는 모든 value들은 하나의 reducer에게 보내진다

mapper가 있는 node에는 reduce task들 역시 포함된다. 이러한 분배작업이 MapReduce에서 유일하게 통신이 이루어지는 부분이다. 개별적인 map task는 상호간에 정보를 교환하지도 않을뿐더러 상호간의 존재조차 알지 못한다. 마찬가지로 각각의 reduce task들도 상호 간에 통신을 하지 않는다. 사용자 역시 기기간의 정보에 대해 명시적으로 간섭하지 않는다. 모든 데이터 전송은 Hadoop의 MapReduce 플랫폼에 의해 이루어지며 이때도 key를 중심으로 해당 value를 주고 받는 방식을 취한다. 바로 이런 점이 Hadoop MapReduce이 높은 신뢰성을 보장받는 가장 큰 이유이다. 

클러스터 내의 node가 장애를 일으키면 task는 재수행된다. 만약 이들에게 소위 side-effects를 수반된다면 (예: 외부와의 통신을 하고 있었다는 등) 그 상태정보 역시 재수행 하는 새로운 task에 복구시킨다. 통신과 side-effect의 문제를 제거하고 차단함으로써 재시동은 자연스럽게 이루어질 수 있다.

세부 동작 원리

HadoopTutorialF0405

Figure 4.5: Hadoop MapReduce의 데이터 흐름에 대한 상세한 모습

Figure 4.5에서는 각 단계에서의 데이터 흐름을 pipeline으로 보여주고 있다.

입력 파일: MapReduce task를 위한 데이터가 저장되어 있는 곳으로서 보통 HDFS상에 존재할 것이다. 

파일 포맷은 그때 그때 다르다: line기반의 log 파일도 있을 수 있고, binary 포맷, multiline의 입력 레코드 등 그 어떤 것도 가능하다. 다만, 용량은 매우 클 것이다.

InputFormat: 입력파일이 분할(split)되는 방식이나 읽어들이는 방식을 정의하는 class이다.

  • 입력으로 사용될 파일 또는 기타의 object를 선택한다.
  • 파일을 task로 분해할 InputSplits 을 정의한다.
  • 파일을 읽어들이는 RecordReader 를 생성하는 factory를 제공한다.

Hadoop은 여러 개의 InputFormat을 제공한다. 

FileInputFormat 라는 이름의abstract type이 존재하며; 파일을 대상으로하는 모든 InputFormat은 이 class로부터 파생된다. 

Hadoop의 job을 수행하기 시작할 때 FileInputFormat에 읽으려고 하는 파일의 경로를 제시하면 FileInputFormat이 이 디렉토리에 있는 모든 파일을 읽어 들인다. 그런 후 각각의 파일을 여러 개의 InputSplit으로 분해한다. 

개발자는 입력파일에 어떤 InputFormat을 적용할지에 대해서는 job을 정의하는 JobConf object의 setInputFormat() method를 호출하는 방식으로 정의한다. 표준의 InputFormat이 다음 표에 정의되어 있다.

InputFormat:

설명

Key:

Value:

TextInputFormatDefault 포맷; 텍스트 파일의 각 line을 읽어들인다.각 line의 byte offset각 line의 내용
KeyValueInputFormat각 line을 key, val pair로 parse한다.첫째 tab 문자까지의 모든 내용line의 나머지 내용
SequenceFileInputFormatHadoop고유의 고성능 바이너리 포맷사용자 정의사용자 정의

Table 4.1: MapReduce가 제공하는 InputFormats

TextInputFormat은 디폴트의 InputFormat으로서 입력 파일의 각 line을 별개의 레코드로 취급하지만 별도의 parsing작업은 하지 않는다. 이것은 특히 unformatted 데이터 또는 log 파일과 같은 line기반의 레코드 등에 유용하다.

보다 흥미로운 입력 포맷은 KeyValueInputFormat으로서 이 역시 입력파일의 각각의 line을 별개의 레코드로 취급한다. 다만 TextInputFormat이 line 전체를 하나의 값(value)으로 여기는 반면 KeyValueInputFormat은 각 line을 tab 문자를 기준으로 key와 value로 분해한다는 점이 다를 뿐이다. 이는 특히 하나의 MapReduce job의 출력물을 읽어서 다른 MapReduce job의 입력항목으로 전달하는데 유용하다. 디폴트 상태에서의 OutputFormat (뒤에 설명) 이 그 결과를 이런 방식으로 포맷팅 하기 때문이다.

끝으로 SequenceFileInputFormat 은 Hadoop에 특수한 바이너리 파일을 읽어 들인다. 이 파일에는 Hadoop mapper에 고속으로 읽어들일 수 있도록 몇 가지 부가기능이 제공된다. Sequence 파일은 block-compressed 방식이고 (텍스트 이외에도). 몇 가지의 임의의 데이터 타입을 직접 serialization 및 deserialization할 수 있는 기능도 있다. Sequence 파일은 다른 MapReduce task의 출력물로 생성될 수도 있으며 한 MapReduce job에서 다른 MapReduce job으로 전달되는 임시 데이터의 표현형식이 되기도 하다.


InputSplits(Job): InputSplit은 MapReduce 프로그램에서 map task를 구성하는 작업의 단위가 된다. Data set에 적용되는 MapReduce 프로그램은 이를 총체적으로 Job이라고 부르며 이 job은 여러 개의 (또는 수 백개의) task로 구성된다. Map task는 한 파일의 전체를 읽거나 일부분만을 읽을 수도 있다. 

디폴트 상태에서 FileInputFormat과 하위 class는 파일을 64 MB 단위의 chunk (HDFS에서의 블록 크기와 동일)로 분할한다. 이 값은 hadoop-site.xml에서 mapred.min.split.size 파라미터를 이용하거나 특정한 MapReduce job을 submit 시키는 기능을 가지는 JobConf object에서 파라미터를 override함으로써 변경할 수 있다. 파일을 chunk 단위로 처리하면 하나의 파일에 대해 여러 개의 map task를 병렬적으로 수행할 수 있게 된다. 파일이 매우 큰 경우 이러한 parallelism을 통해 성능을 획기적으로 높일 수도 있다. 더욱 중요한 것은 파일을 구성하는 다양한 블록을 클러스터 내의 여러 node에 분배할 수 있다는 것이다. 즉, 각각의 블록을 한 node에서 다른 node로 옮길 필요가 없이 해당 node에서 (locally) 처리할 수 있다는 것이다. 물론 log 파일과 같은 것은 이처럼 분할하여 처리가 가능하지만 어떤 파일포맷은 chunk단위로 쪼개기 어려울 수도 있다. 이때는 custom의 InputFormat을 작성해서 파일을 어떻게 분할할지를 (또는 일정조건에서는 분할되지 않도록 할지를) 상세히 지정할 수도 있다. Custom 입력포맷은 Module 5 에서 설명한다.

InputFormat은 mapping 단계에서의 각각의 task의 목록을 정의한다. 각각의 task는 각각 입력되는 split에 대응된다. 이들 task는 입력파일의 chunk가 물리적으로 어디에 위치하고 있는지를 기준으로 각 node에 할당된다. 개별 node마다 수십 개의 task가 할당된다. 각 node가 task에 대해 작업을 개시할 때는 가능한 많은 작업을 병렬처리하도록 노력한다. 이러한 각 node별 parallelism은 mapred.tasktracker.map.tasks.maximum 파라미터를 통해 통제된다.

RecordReader: InputSplit을 통해 일의 단위가 지정되었지만 이를 액세스하는 방법은 정의되지 않는다. 데이터를 source에서 실제로 적재한 후 이를 Mapper가 읽기에 수월한 (key, value) pair로 변환하는 일은 RecordReader class가 담당한다. RecordReader instance는 InputFormat에 의해 정의된다. 디폴트의 InputFormat인 TextInputFormat LineRecordReader를 제공하는데 여기서는 입력파일의 각각의 line을 새로운 값(value)로 취급한다. 각 line에 대한 key는 파일에서의 byte offset이다. 입력 시 InputSplit 전체가 완료될 때까지 RecordReader는 계속 호출(invoke)된다. RecordReader가 호출되면 Mapper의map() method 역시 호출된다.

Mapper: Mapper는 MapReduce 프로그램의 첫째 단계로서 사용자가 정의한 작업을 수행한다. Key와 value가 주어지면 map() method는 (key, value) pair(s)를 Reducer에게 전달한다. 새로운 Mapper instance는 각각의 map task (InputSplit)에 대한 별도의 Java 프로세스 속에서 만들어진다. 이러한 map task는 전체적으로 한의 job input을 구성한다. 각각의 mapper는 다른 mapper와 어떤 방식으로도 통신하지 않는데 이는 의도적인 것이다. 이를 통해 각각의 map task의 신뢰성이 로컬 기기의 신뢰성에 전적으로 좌우되게 되는 것이다.map() method는 key와 value 이외에 2개의 파라미터를 전달받는다.

  • OutputCollector object에는 collect() 라는 method가 있어서 (key, value) pair를 job의 reduce 단계로 전달해 준다.
  • Reporter object 는 현재의 task에 대한 정보를 제공한다. Reporter의 getInputSplit()method는 현재의 InputSplit을 설명하는 object를 반환한다. 또한 map task로 하여금 진행상태에 대한 추가의 정보를 시스템 내 다른 요소에게 제공할 수도 있다. 또한setStatus() method 를 통해 사용자에게 상태메시지를 제공할 수도 있다.incrCounter() method를 통해서는 shared performance counter를 증가시킬 수도 있다. 필요한 임의의 counter를 정의할 수 있으며 각 mapper는 counter를 증가시킬 수 있는데 JobTracker는 여러 프로세스에서 진행된 counter증가값을 수집한 후 이를 합계처리하여 job이 수행종료되었을 때 꺼내보게 된다.

Partition & Shuffle: 첫번째 map task가 종료한 후에도 각 node들은 여러 개의 다른 map task들을 수행하고 있을 수도 있다. 그런 가운데서도 map task로부터의 중간산출물을 이를 필요로 하는 reducer에게로 전달하기 시작한다. 

이처럼 map의 산출물을 reducer에게로 옮기는 것을 shuffling한다고 부른다. 중간단계 key space의 일부가 각각의 reduce node에 할당된다. 이들 subset (이를 “partition”이라고 부른다 - Key에 따라 정렬된 데이터들로 클러스터되어 리듀서로 전달되는것)들은 reduce task에게 입력된다. 

각 map task는 그 어떤 partition에도 (key, value) pair를 전달할 수 있다. 하나의 key에 대한 모든 값은 항상 그 origin이 어떤 mapper였든 상관없이 병합(reduced together)된다. 따라서 중간산출 데이터의 각 항목을 어디로 보낼지에 대해 map node는 의견일치를 보아야 한다.Partitioner class는 주어진 (key, value) pair가 어떤 partition으로 갈지를 결정하는데 디폴트의 partitioner 는 key에 대한 hash 값을 계산한 후 그 결과에 따라 partition을 할당한다. Custom partitioner에 대해서는 Module 5 에서 설명.


Sort: 각각의 reduce task는 여러 개의 중간 key에 관련된 value를 합산(reduce)한다. 개별 node에서의 일련의 중간 key는 Hadoop이 이를 자동으로 정렬한 후 Reducer에게 보내진다.

Reduce: 각각의 reduce task에 대해 Reducer instance가 만들어진다. Reducer instance는 사용자가 제공하는 instance로서 job별로 중요한 2번째 단계가 된다. Reducer에게 할당된 partition에서의 각각의 key에 대해 Reducer의 reduce() method 는 단 한번 호출되는데 이를 통해 key에 연결된 모든 value에 대한 iterator와 key를 받는다. iterator에 의해 하나의 key와 관련된 value들이 반환될 때 그 순서는 무작위이다. Reducer는 또한 OutputCollector Reporter object를 파라미터의 형식으로 받게 되는데 이들은 map() method에서와 같은 방식으로 이용된다.

OutputFormat: OutputCollector에게 제공되는 (key, value) pair는 출력파일에 기록된다. 실제 기록되는 방식은 OutputFormat에 의해 결정된다. OutputFormat 은 앞서의 InputFormat class 와 같은 방식으로 동작한다. Hadoop이 제공하는 OutputFormat의 instance는 로컬디스크 또는 HDFS상의 파일에 기록된다. 이들 모두 일반적인FileOutputFormat에서 상속된 것이다. 각각의 Reducer는 각각의 파일을 일반적인 출력 디렉토리에 기록한다. 이들 파일은 통상 part-nnnnn 라는 이름을 가진다. (nnnnn 는 reduce task와 관련된 partition id이다.) 출력 디렉토리는 FileOutputFormat.setOutputPath()method에 의해 결정된다. 특별한 OutputFormat을 사용하려는 경우에는 MapReduce job을 정의하는 JobConf object 의 setOutputFormat() method를 통해 지정한다. 제공되는 OutputFormat은 다음과 같다.

OutputFormat:

설명

TextOutputFormatDefault; line을 “key \t value” 형태로 기록한다
SequenceFileOutputFormat뒤에 오는 MapReduce job으로 읽어 들이기에 적당한 형태의 바이너리 파일로 기록한다.
NullOutputFormat입력을 무시한다

Table 4.2: OutputFormats provided by Hadoop


Hadoop은 파일에 기록하기 위한 몇 가지 OutputFormat을 제공한다. 디폴트 상태의 instance는 TextOutputFormat으로서 텍스트파일의 각 line에 (key, value) pair를 기록한다. 

이를 나중에 MapReduce task를 통해 KeyValueInputFormat class로 다시 읽어들일 수 있는데 이는 사람도 읽을 수도 있다. MapReduce job들 상호간에 이용할 수 있는 더 좋은 중간 포맷이 SequenceFileOutputFormat 인데 이는 임의의 데이터 타입을 파일로 신속하게 serialize해 준다; 이에 대응되는 SequenceFileInputFormat 은 파일을 같은 타입으로 deserialize 하고 앞서의 Reducer가 산출했던 것과 같은 방식으로 다음 Mapper에게 전달한다. NullOutputFormat 은 아무런 출력파일을 만들지 않으며 OutputCollector에 의해 전달받은 (key, value) pair를 무시한다. 이는 reduce() method에서 독자의 출력파일에 기록하고 Hadoop 프레임워크에 의해 추가의 빈 출력파일이 만들지 않으려는 경우 유용하다.

RecordWriter: InputFormat이 실제로 개별 레코드를 RecordReader 실행을 통해 읽는 것과 마찬가지로 OutputFormat class도 RecordWriter object에 대한 factory역할을 한다. 이들은 OutputFormat에 지정된 대로.개별 레코드를 파일에 기록하는데 이용된다.

Reducer에 의해 작성된 출력파일은 HDFS에 남아있으므로 다른 MapReduce job 또는 별도의 프로그램 또는 사용자의 직접개입을 통해 이용할 수 있다.

추가적인 MapReduce 기능

HadoopTutorialF0406

Figure 4.6: Combiner step이 MapReduce의 데이터 흐름에 포함되었다.

Combiner: 앞서의 그림에서 생략된 프로세스가 하나 있다. Combiner라고 불리우는 것인데 이것을 이용하면 MapReduce job이 사용하는 대역폭을 최적화하고 절감할 수 있게 된다.Combiner 는 Mapper와 Reducer사이에서 진행되는 것으로서 선택적으로 적용이 가능하다. Combiner를 적용하면 map task가 수행되는 모든 node에 대해 Combiner class의 instance가 적용된다. 각각의 node에서 Mapper instance가 산출한 데이터를 입력받고 Combiner가 지정된 작업을 하면 그 결과물을 Reducer에게 보낸다, Combiner는 일종의 “mini-reduce” 프로세스로서 하나의 단위 컴퓨터에서 생성된 데이터만을 대상으로 한다.

Word count프로그램이 대표적인 예가 될 수 있는데 listings 1–3에서 각 단어가 발견될 때마다 (word, 1) pair를 산출했다. 예컨대 “cat”라는 단어가 3번 발견되면 ("cat", 1) pair가 3번 출력되고 따라서 Reducer에게도 3번 전달이 되었다. 그러나, Combiner를 통해 이들이 단위 컴퓨터에서 합산되어서 ("cat", 3) pair가 단 한번만 Reducer에게 전달된다. 이처럼 모든 node 에서 여러 번 반복해서 전달되던 항목을 중간합산 하여 각 단어 당 한번씩만 전달됨으로써 shuffle 프로세스에서 요구되는 대역폭을 획기적으로 줄일 수 있게 되고 결과적으로 job 처리속도가 개선된다. 무엇보다 좋은 점은 별도의 프로그래밍 작업을 할 필요 없다는 점이다. 만약 reduce 함수가 commutative 이고 동시에 associative 라면 이를 Combiner로도 이용할 수 있다. word count 프로그램의 경우 driver 프로그램에 다음 한 줄만 삽입하면 된다.

conf.setCombinerClass(Reduce.class);

Combiner는 Reducer interface의 instance여야 하지만 commutativity 또는 associativity 문제로 인해 Reducer 자체가 직접 Combiner로 이용될 수 없는 경우에도 Combiner로 사용할 제3의 class를 작성하면 가능하다.