2015년 4월 15일 수요일

하둡 스프링 연동 테스트2 - hadoop 2.6.x with spring 4.0 (MapReduce WordCount example)

context-hadoop.xml에 아래 내용 추가.


    fs.default.name=hdfs://localhost:9000










WordCount.java

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.util.StringTokenizer;

public class WordCount {
    private static final Logger logger = LoggerFactory.getLogger(WordCount.class);

    public static class TokenizerMapper
            extends Mapper<Object, Text, Text, IntWritable> {

        private final static IntWritable one = new IntWritable(1);
        private Text word = new Text();

        @Override
        public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
            logger.info("map key={}, value={}", key, value);

            StringTokenizer itr = new StringTokenizer(value.toString());
            while (itr.hasMoreTokens()) {
                word.set(itr.nextToken());
                context.write(word, one);
            }
        }
    }


    public static class IntSumReducer
            extends Reducer<Text, IntWritable, Text, IntWritable> {
        private IntWritable result = new IntWritable();

        @Override
        public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
            logger.info("reduce key={}", key);

            int sum = 0;
            for (IntWritable val : values) {
                sum += val.get();
            }
            result.set(sum);
            context.write(key, result);
        }
    }
}

Test.java

@Autowired
private org.apache.hadoop.conf.Configuration hdConf;

@Autowired
private JobRunner wordCountJobRunner;

@Before
public void beforeCopyFile() throws IOException {
    String file = "/Users/paper/Desktop/4/14/debug.2015-04-09.log";

    Path srcFilePath = new Path(file);
    Path dstFilePath = new Path("/input/debug.2015-04-09.log");

    FileSystem hdfs = FileSystem.get(dstFilePath.toUri(), hdConf);
    hdfs.copyFromLocalFile(false, true, srcFilePath, dstFilePath);

    hdfs.delete(new Path("/output/"), true);
}

@Test
public void testRunJob() throws Exception {
    wordCountJobRunner.call();
}


1. Before를 통하여 로컬에 있는 debug.log 파일을 hdfs에 카피 해놓는다.

2. Job을 실행한다.

3. 실행하면 debug.log 파일을 line단위로 읽어들이는걸 확인 할 수 있다. (WordCount$TokenizerMapper)

2015년 4월 12일 일요일

하둡 스프링 연동 테스트 - hadoop 2.6.x with spring 4.0

Hadoop 설치 및 설정은 아래와 같이 (osx 요세미티.)
https://hadoop.apache.org/releases.html#Download ( 2.6.x 버전 )

설치는 아래 블로그 보고 함
http://iamhereweare.blogspot.kr/2014/05/hadoop.html


- pom.xml 에 아래 dependency 추가.


  org.springframework.data
  spring-data-hadoop
  2.1.1.RELEASE



- context-hadoop.xml spring 설정에 파일 추가




    
        fs.default.name=hdfs://localhost:9000
    




아래와 같이 test코드 작성.
- HdTestServiceTest.java

@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration({
        "classpath:servlet-context.xml",
        "classpath:config/context-datasource.xml",
        "classpath:config/context-hadoop.xml"
})
public class HdTestServiceTest {
    private static final Logger logger = LoggerFactory.getLogger(HdTestService.class);

    @Autowired
    private org.apache.hadoop.conf.Configuration hdConf;

    @Test
    public void testDoTest() throws Exception {
        FileSystem hdfs = null;
        try {
            Path filePath = new Path("/tmp/test.txt");
            logger.info("filePath.uri={}", filePath.toUri());

            hdfs = FileSystem.get(filePath.toUri(), hdConf);

            if(hdfs.exists(filePath)) {
                logger.info("read file path={}", filePath);
                BufferedReader r = new BufferedReader(new InputStreamReader(hdfs.open(filePath), "utf-8"));
                String line = null;
                do {
                    line = r.readLine();
                    logger.info("  line={}", line);
                }
                while(line != null);
                r.close();

                // dfs.delete(filePath, true);
            } else {
                logger.info("create new file path={}", filePath);

                FSDataOutputStream out = hdfs.create(filePath, false);
                out.write("한글 생성 테스트".getBytes("utf-8"));
                out.flush();
                out.close();
            }
        }
        finally {
            IOUtils.closeQuietly(hdfs);
        }
    }
}

잘된다. 다만 아직 로컬에서 못벗어 났지만.. 벗어날 서버가 없어..
위 코드를 이용하면 파일 업로드 다운로드까지는 구현이 가능하겠다.