一、Main方法代码大放送
public static void main(String[] args) throws IOException { Configuration conf = HBaseConfiguration.create(); //可以设置一些参数,比如元数据的一些东西 //conf.set("path", inputPath); Job job = Job.getInstance(conf ,"mapreduce read data from bhase test."); job.setJarByClass(MyMapper.class); //全表扫描Hbase全库数据,这样并非不好,但是压力会大 Scan scan = new Scan(); //加上startKey与stopkey读取某个数据类型的数据 //Scan scan = new Scan(Bytes.toBytes("195861"),Bytes.toBytes("195861" + "1")); //也可以读取某个分区的数据 //Scan scan = new Scan(Bytes.toBytes("195861-123456"),Bytes.toBytes("195861-654321")); //需要扫描的列簇,如果不加将会扫描Hbase的所有列簇,这需要根据需求来确定 scan.addFamily(Bytes.toBytes("info")); //只需要info列簇下的name列 scan.addColumn(Bytes.toBytes("info"), Bytes.toBytes("name")); //设置输出任务的输出路径 FileOutputFormat.setOutputPath(job, new Path("/tmp/test" + new Random().nextInt(10000))); //这个是初始化读取Hbase的方法 TableMapReduceUtil.initTableMapperJob("user"(表名), scan, MyMapper.class, NullWritable.class(设置Map任务的输出Key), Text.class(设置Map任务的输出value), job); //设置reduce的数量 根据需求来定 有些情况是不需要的 job.setNumReduceTasks(0); //提交任务到集群 job.waitForCompletion(true); }
二、Map任务代码大放送
public static class MyMapper extends TableMapper{ public HTableInterface user = null; public MyMapper() { } @Override protected void setup(Context context) throws IOException, InterruptedException { user = new HTable(context.getConfiguration(), "user"); /**FileSystem fs = FileSystem.get(context.getConfiguration()); BufferedReader reader = new BufferedReader(new InputStreamReader(fs.open(new Path(context.getConfiguration().get("fileInput"))))); String line = null; while ((line = reader.readLine()) != null) { String userUrn = "195861-" + line; userUrnList.add(userUrn); } reader.close(); */ } @Override protected void map(ImmutableBytesWritable key, Result result, Context context) throws IOException, InterruptedException { //TODO做一些事情 Put Delete? ........................................... } @Override protected void cleanup(Context context) throws IOException, InterruptedException { user .flushCommits(); user .close(); } }