package cn.hhb.spark.streaming;import org.apache.spark.SparkConf;import org.apache.spark.api.java.JavaPairRDD;import org.apache.spark.api.java.function.FlatMapFunction;import org.apache.spark.api.java.function.Function2;import org.apache.spark.api.java.function.PairFunction;import org.apache.spark.streaming.Durations;import org.apache.spark.streaming.api.java.JavaDStream;import org.apache.spark.streaming.api.java.JavaPairDStream;import org.apache.spark.streaming.api.java.JavaReceiverInputDStream;import org.apache.spark.streaming.api.java.JavaStreamingContext;import scala.Tuple2;import java.util.Arrays;/** * Created by dell on 2017/8/2. */public class HDFSWordCount { public static void main(String[] args) { // 创建SparkConf SparkConf conf = new SparkConf() .setAppName("HDFSWordCount").setMaster("local[2]") .set("spark.testing.memory", "2147480000"); JavaStreamingContext jssc = new JavaStreamingContext(conf, Durations.seconds(1)); JavaDStreamlines = jssc.textFileStream("hdfs://spark1:9000/wordcount_dir"); JavaDStream words = lines.flatMap(new FlatMapFunction () { @Override public Iterable call(String s) throws Exception { return Arrays.asList(s.split(" ")); } }); JavaPairDStream pairs = words.mapToPair(new PairFunction () { @Override public Tuple2 call(String s) throws Exception { return new Tuple2 (s, 1); } }); JavaPairDStream wordCounts = pairs.reduceByKey(new Function2 () { @Override public Integer call(Integer v1, Integer v2) throws Exception { return v1 + v2; } }); wordCounts.print(); jssc.start(); jssc.awaitTermination(); jssc.close(); }}
scala版:
package com.hhb.spark.streamingimport org.apache.spark.SparkConfimport org.apache.spark.streaming.{Seconds, StreamingContext}/** * Created by dell on 2017/8/2. */object HDFSWordCount { def main(args: Array[String]) { val conf = new SparkConf() .setAppName("HDFSWordCount") .setMaster("local[2]") .set("spark.testing.memory", "2147480000") val ssc = new StreamingContext(conf, Seconds(5)) val lines = ssc.textFileStream("hdfs://spark1:9000/wordcount_dir") val words = lines.flatMap(_.split(" ")) val pairs = words.map((_, 1)) val wordCounts = pairs.reduceByKey(_ + _) wordCounts.print() ssc.start() ssc.awaitTermination() }}