博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
mapreduce 读写Parquet格式数据 Demo
阅读量:6759 次
发布时间:2019-06-26

本文共 4628 字,大约阅读时间需要 15 分钟。

import java.io.IOException;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.Mapper;import org.apache.hadoop.mapreduce.Reducer;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import org.apache.hadoop.util.GenericOptionsParser;import org.apache.parquet.example.data.Group;import org.apache.parquet.example.data.simple.SimpleGroupFactory;import org.apache.parquet.hadoop.ParquetInputFormat;import org.apache.parquet.hadoop.ParquetOutputFormat;import org.apache.parquet.hadoop.example.GroupReadSupport;import org.apache.parquet.hadoop.example.GroupWriteSupport;import org.apache.parquet.schema.MessageType;import org.apache.parquet.schema.OriginalType;import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName;import org.apache.parquet.schema.Types;/** * MR Parquet格式数据读写Demo */public class ParquetReaderAndWriteMRDemo {    public static void main(String[] args) throws Exception {        Configuration conf = new Configuration();        String[] otherargs=new GenericOptionsParser(conf, args).getRemainingArgs();        if(otherargs.length!=3){            System.out.println("
1"); System.out.println("
2"); System.out.println("
3"); System.out.println("
4"); System.exit(2); } //此demo 输入数据为2列 city ip MessageType schema = Types.buildMessage() .required(PrimitiveTypeName.BINARY).as(OriginalType.UTF8).named("city") .required(PrimitiveTypeName.BINARY).as(OriginalType.UTF8).named("ip") .named("pair"); System.out.println("[schema]=="+schema.toString()); GroupWriteSupport.setSchema(schema, conf); Job job = Job.getInstance(conf, "ParquetReadMR"); job.setJarByClass(ParquetReaderAndWriteMRDemo.class); if(otherargs[2].equals("1")){ job.setMapperClass(NormalMapper.class); job.setReducerClass(NormalReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); FileInputFormat.setInputPaths(job,otherargs[0] ); FileOutputFormat.setOutputPath(job, new Path(otherargs[1])); if (!job.waitForCompletion(true)) return; } if(otherargs[2].equals("3")){ job.setMapperClass(ParquetWriteMapper.class); job.setNumReduceTasks(0); FileInputFormat.setInputPaths(job,otherargs[0] ); //parquet输出 job.setOutputFormatClass(ParquetOutputFormat.class); ParquetOutputFormat.setWriteSupportClass(job, GroupWriteSupport.class);// ParquetOutputFormat.setOutputPath(job, new Path(otherargs[1])); FileOutputFormat.setOutputPath(job, new Path(otherargs[1])); if (!job.waitForCompletion(true)) return; } if(otherargs[2].equals("2")){ //parquet输入 job.setMapperClass(ParquetReadMapper.class); job.setNumReduceTasks(0); job.setInputFormatClass(ParquetInputFormat.class); ParquetInputFormat.setReadSupportClass(job, GroupReadSupport.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); FileInputFormat.setInputPaths(job,otherargs[0] ); FileOutputFormat.setOutputPath(job, new Path(otherargs[1])); if (!job.waitForCompletion(true)) return; } if(otherargs[2].equals("4")){ //TODO 不想写了 } } public static class ParquetWriteMapper extends Mapper
{ SimpleGroupFactory factory=null; protected void setup(Context context) throws IOException ,InterruptedException { factory = new SimpleGroupFactory(GroupWriteSupport.getSchema(context.getConfiguration())); }; public void map(LongWritable _key, Text ivalue, Context context) throws IOException, InterruptedException { Group pair=factory.newGroup(); String[] strs=ivalue.toString().split("\\s+"); pair.append("city", strs[0]); pair.append("ip", strs[1]); context.write(null,pair); } } public static class ParquetReadMapper extends Mapper
{ public void map(Void _key, Group group, Context context) throws IOException, InterruptedException { String city=group.getString(0, 0); String ip=group.getString(1, 0); context.write(new Text(city),new Text(ip)); } } public static class NormalMapper extends Mapper
{ public void map(LongWritable ikey, Text ivalue, Context context) throws IOException, InterruptedException { String[] strs=ivalue.toString().split("\\s+"); context.write(new Text(strs[0]), new Text(strs[1])); } } public static class NormalReducer extends Reducer
{ public void reduce(Text _key, Iterable
values, Context context) throws IOException, InterruptedException { for (Text text : values) { context.write(_key,text); } } }}

 

转载于:https://www.cnblogs.com/yanghaolie/p/7389543.html

你可能感兴趣的文章
论《我是如何安慰女友的》
查看>>
nullnull用宏定义swap(x,y)
查看>>
菜鸟学Java(一)——Ajax异步检查用户名是否存在
查看>>
【Javascript】类,封装性 -- 1
查看>>
Mono for Android安装配置破解
查看>>
uploadfy 常见问题收集
查看>>
WPF----数据绑定
查看>>
子类化GetOpenFileName/GetSaveFileName, 以及钩子函数OFNHookProc的使用的简要说明
查看>>
C语言中判断int,long型等变量是否赋值的方法
查看>>
leetcode -- Longest Valid Parentheses
查看>>
中位数与第K小元素
查看>>
详解JAVA输出Hello World
查看>>
概率问题随笔
查看>>
关于在堆中创建字符串对象的疑惑
查看>>
poj1077(康托展开+bfs+记忆路径)
查看>>
hibernate 树状映射
查看>>
值得 Web 开发人员收藏的20个 HTML5 实例教程
查看>>
经典网页设计:无缝过渡的响应式设计案例
查看>>
ASP.NET MVC 多语言方案
查看>>
移动设备、手机浏览器Javascript滑动事件代码
查看>>