Tools/Hadoop

MapReduce 이해 하기

칼쵸쵸 2023. 7. 25. 19:09

공식 문서 : https://hadoop.apache.org/docs/stable/hadoop-mapreduce-client/hadoop-mapreduce-client-core/MapReduceTutorial.html

 

Apache Hadoop 3.3.6 – MapReduce Tutorial

<!--- Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or a

hadoop.apache.org

맵리듀스(MapReduce)는 대규모 데이터를 처리하기 위한 분산 컴퓨팅 프로그래밍 모델로, 구글에서 처음 개발되었습니다. 맵리듀스는 데이터 처리를 두 단계로 나누어 수행하며, 각각 "(Map)" 단계와 "리듀스(Reduce)" 단계로 구성됩니다

 

(Map) 단계: 

입력 데이터를 작은 조각으로 나누고, 이러한 작은 조각들을 독립적으로 처리합니다.

각 조각은 키-(key-value) 쌍의 형태로 매핑됩니다.

사용자가 정의한 맵 함수를 적용하여 키-값 쌍의 리스트를 생성합니다.

맵 함수는 입력 데이터를 적절하게 가공하여 중간 결과를 생성하는 역할을 합니다.

맵 함수가 모든 입력 조각에 대해 병렬로 수행되기 때문에, 처리 속도가 빠릅니다.

맵 함수의 결과로 생성된 키-값 쌍은 동일한 키를 갖는 데이터끼리 묶인다는 특징이 있습니다.

 

셔플과 정렬(Shuffle and Sort) 단계:

맵 단계에서 생성된 중간 결과인 키-값 쌍을 리듀스 단계로 보내기 전에 셔플과 정렬이 이루어집니다.

셔플 단계에서는 동일한 키를 갖는 중간 결과들이 같은 리듀스 태스크로 전송되도록 조정합니다.

정렬 단계에서는 동일한 키를 갖는 중간 결과들이 키 순서대로 정렬됩니다.

 

리듀스(Reduce) 단계:

셔플과 정렬 단계에서 키 순서대로 정렬된 중간 결과를 사용자가 정의한 리듀스 함수에 입력으로 제공합니다.

리듀스 함수는 동일한 키를 갖는 모든 중간 결과들을 하나의 리스트로 묶습니다.

사용자가 정의한 리듀스 함수에 따라 중간 결과들을 합치거나 집계하여 최종 결과를 생성합니다.

리듀스 함수는 입력 데이터를 최종 결과로 변환하는 역할을 수행합니다.

리듀스 함수는 병렬로 실행되며, 각 키에 대해 하나의 결과를 생성합니다.

 

https://www.edureka.co/blog/mapreduce-tutorial/

맵리듀스 프레임워크는 오로지 <, > 쌍에 대해서만 동작합니다. , 프레임워크는 작업의 입력을 <, > 쌍의 집합으로 간주하고, 작업의 출력으로 다른 유형의 <, > 쌍의 집합을 생성합니다.

 

(key)와 값(value) 클래스는 프레임워크에 의해 직렬화(Serialization)될 수 있어야 하므로 Writable 인터페이스를 구현해야 합니다. 또한, 프레임워크에 의해 정렬(Sorting)을 용이하게 하기 위해 키 클래스는 WritableComparable 인터페이스를 구현해야 합니다.

 

맵리듀스 작업의 입력과 출력 유형은 다음과 같습니다:

 

(입력) <k1, v1> -> (Map) -> <k2, v2> -> 컴바인(Combine) -> <k2, v2> -> 리듀스(Reduce) -> <k3, v3> (출력)

 

맵리듀스 작업은 입력으로 주어진 <, > 쌍을 맵 함수를 통해 변환하여 중간 결과인 <k2, v2> 형태로 출력합니다. 그런 다음 컴바인 과정을 거쳐 동일한 키를 갖는 중간 결과들을 병합합니다. 마지막으로 리듀스 함수를 사용하여 최종 결과인 <k3, v3> 쌍으로 출력을 생성합니다.

 

word count 예시

소스코드

import java.io.IOException;
import java.util.StringTokenizer;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class WordCount {

  public static class TokenizerMapper
       extends Mapper<Object, Text, Text, IntWritable>{

    private final static IntWritable one = new IntWritable(1);
    private Text word = new Text();

    public void map(Object key, Text value, Context context
                    ) throws IOException, InterruptedException {
      StringTokenizer itr = new StringTokenizer(value.toString());
      while (itr.hasMoreTokens()) {
        word.set(itr.nextToken());
        context.write(word, one);
      }
    }
  }

  public static class IntSumReducer
       extends Reducer<Text,IntWritable,Text,IntWritable> {
    private IntWritable result = new IntWritable();

    public void reduce(Text key, Iterable<IntWritable> values,
                       Context context
                       ) throws IOException, InterruptedException {
      int sum = 0;
      for (IntWritable val : values) {
        sum += val.get();
      }
      result.set(sum);
      context.write(key, result);
    }
  }

  public static void main(String[] args) throws Exception {
    Configuration conf = new Configuration();
    Job job = Job.getInstance(conf, "word count");
    job.setJarByClass(WordCount.class);
    job.setMapperClass(TokenizerMapper.class);
    job.setCombinerClass(IntSumReducer.class);
    job.setReducerClass(IntSumReducer.class);
    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(IntWritable.class);
    FileInputFormat.addInputPath(job, new Path(args[0]));
    FileOutputFormat.setOutputPath(job, new Path(args[1]));
    System.exit(job.waitForCompletion(true) ? 0 : 1);
  }
}

 

실행 명령어

# 환경 설정

export JAVA_HOME=/usr/java/default
export PATH=${JAVA_HOME}/bin:${PATH}
export HADOOP_CLASSPATH=${JAVA_HOME}/lib/tools.jar

# jar 생성

$ bin/hadoop com.sun.tools.javac.Main WordCount.java
$ jar cf wc.jar WordCount*.class

# 파일 위치
# /user/joe/wordcount/input - input directory in HDFS
# /user/joe/wordcount/output - output directory in HDFS

#Sample text-files as input:
$ bin/hadoop fs -ls /user/joe/wordcount/input/
/user/joe/wordcount/input/file01
/user/joe/wordcount/input/file02

$ bin/hadoop fs -cat /user/joe/wordcount/input/file01
Hello World Bye World

$ bin/hadoop fs -cat /user/joe/wordcount/input/file02
Hello Hadoop Goodbye Hadoop



# 실행
$ bin/hadoop jar wc.jar WordCount /user/joe/wordcount/input /user/joe/wordcount/output

# 결과
$ bin/hadoop fs -cat /user/joe/wordcount/output/part-r-00000
Bye 1
Goodbye 1
Hadoop 2
Hello 2
World 2

 

설명 

Mapper

public void map(Object key, Text value, Context context
                ) throws IOException, InterruptedException {
  StringTokenizer itr = new StringTokenizer(value.toString());
  while (itr.hasMoreTokens()) {
    word.set(itr.nextToken());
    context.write(word, one);
  }
}

 

 

Mapper 구현은 map 메서드를 통해 한 번에 한 줄씩 처리합니다. 이 때 입력은 지정된 TextInputFormat에 의해 제공되는 형식으로 되어 있습니다. 그리고 StringTokenizer를 사용하여 줄을 공백으로 구분된 토큰으로 나눕니다. 그리고 최종적으로 <word, 1> 형태의 키-값 쌍을 발행합니다.

 

1. TextInputFormat: 이는 하둡의 기본 입력 형식 중 하나로, 텍스트 파일의 각 줄을 입력으로 제공하는 형식입니다. 매퍼는 입력 파일을 줄 단위로 읽고 각 줄을 처리합니다.

 

2. map 메서드: 이 메서드는 매퍼의 핵심 메서드로, 입력 데이터를 가공하여 키-값 쌍을 발행하는 역할을 합니다.

 

3. StringTokenizer: 이는 입력으로 받은 한 줄의 텍스트를 공백을 기준으로 토큰으로 분리하는 데 사용됩니다. , 각 줄의 단어들을 분리합니다.

 

-값 쌍 발행: StringTokenizer를 사용하여 분리된 각 단어를 매퍼는 <word, 1> 형태의 키-값 쌍으로 변환합니다. 이때 word는 토큰으로 나눠진 단어이며, 1은 해당 단어가 한 번 등장했음을 의미하는 값을 가집니다.

 

이렇게 매퍼는 입력 데이터를 줄 단위로 읽어와서 각 단어를 토큰으로 분리하고, 그 결과를 <word, 1> 형태의 키-값 쌍으로 변환하여 출력합니다. 이렇게 발행된 키-값 쌍은 이후 리듀서로 전달되어 단어의 출현 빈도를 집계하는 작업을 수행하게 됩니다.

 

 

reduce 

reduce 이 메서드는 리듀서의 핵심 메서드로, 같은 키를 갖는 값들을 합산하는 역할을 합니다.

public void reduce(Text key, Iterable<IntWritable> values,
                   Context context
                   ) throws IOException, InterruptedException {
  int sum = 0;
  for (IntWritable val : values) {
    sum += val.get();
  }
  result.set(sum);
  context.write(key, result);
}

 

값 합산: 리듀서는 같은 키(: 같은 단어)를 가진 값들(출현 횟수)을 받아와서 이들 값을 합산합니다. 이렇게 함으로써 같은 키에 대한 출현 빈도를 구할 수 있습니다.

 

예를 들어, 매퍼에서 <word, 1> 형태로 발행된 키-값 쌍들은 리듀서로 전달됩니다. 이때, 같은 단어를 가진 여러 개의 값들이 함께 전달됩니다. 리듀서는 이러한 값들을 받아와서 해당 단어의 출현 횟수를 합산합니다. , 같은 단어를 가진 값들을 더하여 해당 단어의 총 출현 횟수를 계산합니다.

 

이렇게 리듀서는 합산된 결과를 출력으로 내보내며, 출력은 <word, total_count> 형태로 이루어집니다. 이때 word는 단어를 의미하고, total_count는 해당 단어의 총 출현 횟수를 의미합니다.

 

따라서 리듀서는 각 단어의 출현 빈도를 계산하기 위해 같은 단어를 가진 값들을 합산하는 작업을 수행합니다. 이를 통해 맵리듀스 작업은 입력으로 주어진 텍스트 데이터에서 각 단어의 출현 빈도를 효율적으로 계산할 수 있게 됩니다.

 

환경 설정

config 설정을 통해 map, shuffle/ reduce에 대한 파라미터를 설정 할 수 있습니다.

Configuration conf = new Configuration();
// 맵리듀스 작업에 대한 다른 설정들을 추가할 수 있습니다.

// mapreduce.task.io.sort.mb 설정 추가
conf.setInt("mapreduce.task.io.sort.mb", 512);

Job job = Job.getInstance(conf, "MapReduce Job");

 

XML 형식으로도 설정 가능

<configuration>
  <property>
    <name>mapreduce.map.java.opts</name>
    <value>
    -Xmx512M -Djava.library.path=/home/mycompany/lib -verbose:gc -Xloggc:/tmp/@taskid@.gc
    -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false
    </value>
  </property>
</configuration>
Configuration conf = new Configuration();
conf.addResource(new Path("/path/to/mapred-site.xml")); // XML 설정 파일 로드

// 맵리듀스 작업에 필요한 다른 설정들을 추가할 수 있습니다.

Job job = Job.getInstance(conf, "MapReduce Job");

 

 

Map 설정

mapreduce.task.io.sort.mb

이 변수는 각 Map 또는 Reduce 작업의 중간 결과를 정렬하기 위한 메모리 버퍼 크기를 지정합니다. 중간 결과가 맵 또는 리듀스 작업 내에서 정렬되어야 하는 경우에 사용됩니다. 맵 태스크의 경우, 이 변수는 맵 출력을 정렬하는 데 사용되며, 리듀스 태스크의 경우에는 리듀스 입력을 정렬하는 데 사용됩니다.

데이터가 많을수록 더 많은 메모리가 필요하지만, 너무 큰 값을 설정하면 메모리가 부족하게 되어 오히려 성능이 저하될 수 있습니다. 이 값을 적절하게 조정하는 것이 중요합니다.

 

기본값은 일반적으로 클러스터의 총 메모리에 따라 달라지지만, 예를 들어 1 기가바이트의 값일 수 있습니다. 다음은 설정 예시입니다:

 

 

mapreduce.map.sort.spill.percent

이 변수는 맵 작업이 출력 데이터를 정렬하는 데 사용하는 메모리 버퍼 크기를 지정합니다. MapReduce 프로세스 중간에 맵 출력이 일정 크기에 도달하면 정렬을 시작해야 합니다. 그러나 모든 출력 데이터를 한꺼번에 정렬하는 것은 메모리 사용 측면에서 비효율적일 수 있습니다. 대신 이 변수는 맵 출력 데이터가 얼마나 차면(얼마나 채워지면) 정렬을 시작해야 하는지를 백분율로 지정합니다.

기본적으로 Hadoop은 맵 출력 데이터가 90% 정도 채워지면 정렬 작업을 시작합니다. 이 값을 높게 설정하면 더 많은 데이터가 채워진 후에 정렬을 시작하므로, 정렬을 수행하는 데 더 많은 메모리가 필요하지만, 더 큰 덩어리로 데이터를 정렬하게 되어 정렬 속도가 향상될 수 있습니다.

 

 

shuffle/ reduce 설정

mapreduce.task.io.sort.factor

이 변수는 맵과 리듀스 작업에서 사용하는 스트림 정렬을 위한 버퍼의 개수를 지정합니다. 정렬된 결과를 디스크로 기록할 때, 이 변수는 정렬된 결과를 임시로 저장하는 버퍼의 개수를 나타냅니다. 이렇게 함으로써 다중 디스크나 병렬 처리를 사용하여 더 효율적인 정렬을 수행할 수 있습니다.

기본값은 보통 100입니다만, 클러스터의 하드웨어 구성과 작업의 특성에 따라 조정이 필요할 수 있습니다.

 

mapreduce.reduce.merge.inmem.thresholds

이 변수는 리듀스 작업에서 메모리에 보관되는 맵 출력의 크기를 지정합니다. 리듀스 태스크가 실행되는 동안 맵 출력이 메모리에 충분히 많이 쌓이면, 이러한 맵 출력을 병합하여 디스크에 기록합니다. mapreduce.reduce.merge.inmem.thresholds 변수는 메모리에 남아있는 맵 출력의 크기가 어느 정도인 경우에 이러한 병합 작업을 수행해야 하는지를 지정합니다.

기본값은 일반적으로 100입니다만, 클러스터의 하드웨어 및 작업의 특성에 따라 다른 값을 사용할 수 있습니다.

 

mapreduce.reduce.shuffle.merge.percent

이 변수는 리듀스 작업의 셔플 단계에서 병합되는 데이터의 비율을 지정합니다. 셔플 단계는 맵 태스크의 출력을 리듀스 태스크로 전송하고, 리듀스 태스크는 동일한 키를 가진 맵 출력을 병합합니다. mapreduce.reduce.shuffle.merge.percent 변수는 리듀스 태스크가 이러한 맵 출력을 병합할 때, 병합하는 데이터의 양을 백분율로 지정합니다.

기본값은 일반적으로 66.7입니다만, 클러스터의 네트워크 대역폭과 작업의 특성에 따라 다른 값을 사용할 수 있습니다.

 

mapreduce.reduce.shuffle.input.buffer.percent

이 변수는 리듀스 작업의 셔플 단계에서 입력을 처리하는 데 사용되는 버퍼 크기를 지정합니다. 셔플 단계에서는 맵 출력 데이터를 리듀스 태스크로 전송하고, 리듀스 태스크는 이러한 데이터를 정렬하고 병합하는 작업을 수행합니다. 이때 mapreduce.reduce.shuffle.input.buffer.percent 변수는 리듀스 태스크가 네트워크로부터 받아들이는 입력 데이터의 양을 백분율로 지정합니다.

기본값은 일반적으로 70입니다만, 클러스터의 네트워크 대역폭과 작업의 특성에 따라 다른 값을 사용할 수 있습니다.

 

mapreduce.reduce.input.buffer.percent

이 변수는 리듀스 작업에서 입력을 처리하는 데 사용되는 버퍼 크기를 지정합니다. 리듀스 작업은 맵 작업의 출력을 받아들이고 결과를 저장합니다. mapreduce.reduce.input.buffer.percent 변수는 리듀스 태스크가 입력 데이터를 처리하는 데 사용하는 메모리 버퍼의 크기를 백분율로 지정합니다.

기본값은 일반적으로 0.0이며, 이는 리듀스 태스크가 디스크로부터 데이터를 직접 읽도록 설정되어 있는 것을 의미합니다. 클러스터의 메모리 구성과 작업의 특성에 따라 이 값을 적절하게 조정할 수 있습니다.

 

 

실행

 job.waitForCompletion(true)

System.exit(job.waitForCompletion(true) ? 0 : 1);

 Hadoop MapReduce 작업이 완료될 때까지 기다리는 메서드입니다. 이 메서드는 작업이 성공적으로 완료되면 true를 반환하고, 작업이 실패하거나 예외가 발생하면 false를 반환합니다. 기본적으로 이 메서드는 작업이 완료될 때까지 현재 스레드를 블록하며, 작업이 끝나면 다음 코드 라인으로 진행됩니다.

 

하지만 job.waitForCompletion(true) 이외에도 작업을 제출하고 실행하는 다른 방법들이 있습니다. MapReduce 작업을 제출하는 방법에는 크게 두 가지가 있습니다:

 

job.waitForCompletion(false)

job.waitForCompletion(false)는 작업을 제출하고 완료 여부를 기다리지 않고 즉시 다음 코드 라인으로 진행하는 비동기적 방식으로 작업을 실행합니다. 이렇게 하면 작업이 백그라운드에서 실행되며, 메인 스레드는 다른 작업을 계속 수행할 수 있습니다.

 

작업이 완료된 후 작업의 상태를 확인하거나 결과를 처리하려면 job.isComplete() 메서드와 job.isSuccessful() 메서드를 사용할 수 있습니다.

 

Job job = new Job(conf, "my_job");
// ... 설정 및 입력 경로 등의 작업 설정 ...
job.submit(); // 작업을 제출하고 즉시 다음 코드 라인으로 진행

// 다른 작업을 수행하는 코드

// 작업이 완료될 때까지 기다리고 결과 처리
if (job.waitForCompletion(false)) {
    // 작업이 성공적으로 완료됨
} else {
    // 작업이 실패하거나 예외가 발생함
}

 

Asynchronous method (비동기 메서드)

MapReduce 버전 2부터는 비동기적으로 작업을 제출하고 완료 여부를 확인하는 메서드가 도입되었습니다. 예를 들어, job.submit() 대신 job.submitAsync()를 사용하여 비동기적으로 작업을 제출하고, job.monitorAndPrintJob()을 사용하여 작업의 진행 상황을 출력할 수 있습니다.

Job job = new Job(conf, "my_job");
// ... 설정 및 입력 경로 등의 작업 설정 ...
job.submitAsync(); // 작업을 비동기적으로 제출

// 다른 작업을 수행하는 코드

// 작업의 진행 상황을 출력
job.monitorAndPrintJob();

// 작업이 완료될 때까지 기다리고 결과 처리
if (job.waitForCompletion(true)) {
    // 작업이 성공적으로 완료됨
} else {
    // 작업이 실패하거나 예외가 발생함
}

비동기 메서드를 사용하면 작업의 진행 상황을 출력하거나 다른 비동기적 작업들과 함께 병렬로 실행할 수 있습니다. 이렇게 함으로써 작업의 실행 및 모니터링을 유연하게 조절할 수 있습니다.

'Tools > Hadoop' 카테고리의 다른 글

하둡 클러스터와 리소스 할당  (0) 2023.09.26
하둡 네임노드와 HDFS  (0) 2023.09.11
Hadoop YARN 아키텍쳐  (0) 2023.07.02
Hadoop HDFS 아키텍쳐  (0) 2023.07.02
Hadoop 컴퓨팅과 클러스터  (0) 2023.07.02