博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
基于HDFS的实时wordcount程序
阅读量:6918 次
发布时间:2019-06-27

本文共 2532 字,大约阅读时间需要 8 分钟。

hot3.png

package cn.hhb.spark.streaming;import org.apache.spark.SparkConf;import org.apache.spark.api.java.JavaPairRDD;import org.apache.spark.api.java.function.FlatMapFunction;import org.apache.spark.api.java.function.Function2;import org.apache.spark.api.java.function.PairFunction;import org.apache.spark.streaming.Durations;import org.apache.spark.streaming.api.java.JavaDStream;import org.apache.spark.streaming.api.java.JavaPairDStream;import org.apache.spark.streaming.api.java.JavaReceiverInputDStream;import org.apache.spark.streaming.api.java.JavaStreamingContext;import scala.Tuple2;import java.util.Arrays;/** * Created by dell on 2017/8/2. */public class HDFSWordCount {    public static void main(String[] args) {        // 创建SparkConf        SparkConf conf = new SparkConf()                .setAppName("HDFSWordCount").setMaster("local[2]")                .set("spark.testing.memory", "2147480000");        JavaStreamingContext jssc = new JavaStreamingContext(conf, Durations.seconds(1));        JavaDStream
lines = jssc.textFileStream("hdfs://spark1:9000/wordcount_dir"); JavaDStream
words = lines.flatMap(new FlatMapFunction
() { @Override public Iterable
call(String s) throws Exception { return Arrays.asList(s.split(" ")); } }); JavaPairDStream
pairs = words.mapToPair(new PairFunction
() { @Override public Tuple2
call(String s) throws Exception { return new Tuple2
(s, 1); } }); JavaPairDStream
wordCounts = pairs.reduceByKey(new Function2
() { @Override public Integer call(Integer v1, Integer v2) throws Exception { return v1 + v2; } }); wordCounts.print(); jssc.start(); jssc.awaitTermination(); jssc.close(); }}

scala版:

package com.hhb.spark.streamingimport org.apache.spark.SparkConfimport org.apache.spark.streaming.{Seconds, StreamingContext}/**  * Created by dell on 2017/8/2.  */object HDFSWordCount {  def main(args: Array[String]) {    val conf = new SparkConf()      .setAppName("HDFSWordCount")      .setMaster("local[2]")      .set("spark.testing.memory", "2147480000")    val ssc = new StreamingContext(conf, Seconds(5))    val lines = ssc.textFileStream("hdfs://spark1:9000/wordcount_dir")    val words = lines.flatMap(_.split(" "))    val pairs = words.map((_, 1))    val wordCounts = pairs.reduceByKey(_ + _)    wordCounts.print()    ssc.start()    ssc.awaitTermination()  }}

转载于:https://my.oschina.net/hehongbo/blog/1499513

你可能感兴趣的文章
web项目的WEB-INF目录使用说明
查看>>
GitHub Page+Hexo+nexT 搭建个人博客
查看>>
请求和响应
查看>>
除了画佩奇我们还要玩点更高级的
查看>>
30年分布,30年集中——高校认证计费的变革
查看>>
我的友情链接
查看>>
使用Cobbler2.4.0批量自动安装Esxi5.5
查看>>
我的友情链接
查看>>
Nagios 系统监控
查看>>
Python-w3
查看>>
解决Python开发过程中依赖库打包问题的方法
查看>>
jpeg note
查看>>
一个例子告诉你什么是CLR(JVM同理),以及版本兼容
查看>>
文章记录
查看>>
springAop
查看>>
AJAX入门学习-1:理解ajax
查看>>
ESXi中的虚拟机如何使用U盘
查看>>
把别人的Tcl/Tk代码加入到Go语言里13 游戏6 消除方块
查看>>
关于linux hrtimer高精度定时器的使用注意事项
查看>>
高清视频教程网站的搭建和分享
查看>>