博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
Python技术栈与Spark大数据平台整合实战--大数据ML样本集案例实战
阅读量:6697 次
发布时间:2019-06-25

本文共 6195 字,大约阅读时间需要 20 分钟。

版权声明:本套技术专栏是作者(秦凯新)平时工作的总结和升华,通过从真实商业环境抽取案例进行总结和分享,并给出商业应用的调优建议和集群环境容量规划等内容,请持续关注本套博客。QQ邮箱地址:1120746959@qq.com,如有任何学术交流,可随时联系。

1 Python技术栈与Spark大数据数据平台整合

  • 下载Anaconda3 Linux版本

    Anaconda3-5.3.1-Linux-x86_64.sh复制代码
  • 安装Anaconda3

    bash Anaconda3-5.3.1-Linux-x86_64.sh -b 复制代码
  • 环境变量配置PYSPARK_DRIVER_PYTHON以及PYSPARK_PYTHON配置

    export SCALA_HOME=/usr/local/install/scala-2.11.8  export JAVA_HOME=/usr/lib/java/jdk1.8.0_45  export HADOOP_HOME=/usr/local/install/hadoop-2.7.3  export SPARK_HOME=/usr/local/install/spark-2.3.0-bin-hadoop2.7  export FLINK_HOME=/usr/local/install/flink-1.6.1    export ANACONDA_PATH=/root/anaconda3  export PYSPARK_DRIVER_PYTHON=$ANACONDA_PATH/bin/ipython  export PYSPARK_PYTHON=$ANACONDA_PATH/bin/python      export JRE_HOME=${JAVA_HOME}/jre  export CLASS_PATH=.:${JAVA_HOME}/lib:${JRE_HOME}/lib  export PATH=:${JAVA_HOME}/bin:${SCALA_HOME}/bin:${HADOOP_HOME}/bin:${SPARK_HOME}/bin:$PATH  export PATH=/root/anaconda3/bin:$PATH复制代码
  • 启动Saprk

  • 启动jupyter notebook

    老版本  PYSPARK_DRIVER_PYTHON=ipython PYSPARK_DRIVER_PYTHON_OPTS="notebook --allow-root" pyspark    未来版本  PYSPARK_DRIVER_PYTHON=ipython PYSPARK_DRIVER_PYTHON_OPTS=`jupyter notebook --allow-root` pyspark复制代码

  • jupyter远程访问

    jupyter notebook --generate-config  vi ~/.jupyter/jupyter_notebook_config.py  c.NotebookApp.ip = '*' # 允许访问此服务器的 IP,星号表示任意 IP  c.NotebookApp.open_browser = False # 运行时不打开本机浏览器  c.NotebookApp.port = 12035 # 使用的端口,随意设置  c.NotebookApp.enable_mathjax = True # 启用 MathJax  c.NotebookApp.allow_remote_access = True复制代码
  • jupyter NoteBook开发界面

  • spark程序调试

lines=sc.textFile("/LICENSE")    pairs = lines.map(lambda s: (s, 1))    counts = pairs.reduceByKey(lambda a, b: a + b)        counts.count()    243        counts.first()    ('                                 Apache License', 1)复制代码
  • Standalone模式启动

    PYSPARK_DRIVER_PYTHON=ipython PYSPARK_DRIVER_PYTHON_OPTS="notebook --allow-root" MASTER=spark://SparkMaster:7077 pyspark复制代码

2 Spark转换运算

2.1 scala 操作

val intRDD=sc.parallelize(List(1,2,3))    intRDD.collect    Array[Int] = Array(1, 2, 3)复制代码

2.2 python 操作

  • python基础RDD操作

    #parallelize  intRDD=sc.parallelize([1,2,3])  intRDD.collect()  [1, 2, 3]    StringRDD=sc.parallelize(["Apple","Orange"])  StringRDD.collect()  ['Apple', 'Orange']    #具名函数  def addOne(x):      return x+1  intRDD.map(addOne).collect()  #匿名函数 intRDD=sc.parallelize([1,2,3]) intRDD.map(lambda x:x+1).collect()  [2, 3, 4]  #过滤器 intRDD.filter(lambda x:1< x and x<5).collect() [2, 3]  #in stringRDD =sc.parallelize(["apple","blue"]) stringRDD.filter(lambda x:"apple" in x).collect() ['apple']  #distinct intRDD=sc.parallelize([1,2,3,2,7]) intRDD.distinct().collect() [1, 2, 3, 7]  #randomSplit sRDD=intRDD.randomSplit([0.4,0.6]) sRDD[0].collect() [1, 2]  #groupBy group=intRDD.groupBy(lambda x:"even" if(x%2==0) else "odd").collect() print(group)  [('odd', 
    ), ('even',
    )] print (sorted(group[0][1])) [1, 3, 7] print (sorted(group[1][1])) [2, 2]复制代码
  • python多个RDD转换操作

    intRDD1=sc.parallelize(["apple","blue"]) intRDD2=sc.parallelize([1,2]) intRDD3=sc.parallelize(["apple","blue"])  #合并运算 intRDD1.union(intRDD2).union(intRDD3).collect()  ['apple', 'blue', 1, 2, 'apple', 'blue'] #交集运算 intRDD1=sc.parallelize([3,1,2,5,5]) intRDD2=sc.parallelize([5,6]) intRDD3=sc.parallelize([2,7]) intRDD1.intersection(intRDD2).collect() [5]  intRDD1=sc.parallelize([3,1,2,5,5]) intRDD2=sc.parallelize([5,6]) intRDD3=sc.parallelize([2,7]) intRDD1.subtract(intRDD2).collect() [2, 3, 1]  intRDD1.first() intRDD1.take(3)  intRDD1.takeOrdered(3) [1, 2, 3]  intRDD1.takeOrdered(3,lambda x:-x) [5, 5, 3]复制代码
  • Python RDD基于Key-Value转换

    kvRDD1=sc.parallelize([(3,4),[3,6],[5,6],[1,2]])  kvRDD1.collect()  [(3, 4), [3, 6], [5, 6], [1, 2]]    kvRDD1.keys().collect()  [3, 3, 5, 1]    kvRDD1.values().collect()  [4, 6, 6, 2]    kvRDD1.filter(lambda keyvalue :keyvalue[0]<5).collect()  [(3, 4), [3, 6], [1, 2]]    kvRDD1.mapValues(lambda x:x*x).collect()  [(3, 16), (3, 36), (5, 36), (1, 4)]    kvRDD1.sortByKey(ascending=False).collect()  [[1, 2], (3, 4), [3, 6], [5, 6]]    kvRDD1.reduceByKey(lambda x,y:x+y).collect()  [(3, 10), (5, 6), (1, 2)]复制代码
  • Python 多个RDD 转换操作

    #join  kvRDD1=sc.parallelize([(3,4),[3,6],[5,6],[1,2]])  kvRDD2=sc.parallelize([(3,8)])    kvRDD1.join(kvRDD2).collect()     [(3, (4, 8)), (3, (6, 8))]    #左连接  kvRDD1.leftOuterJoin(kvRDD2).collect()  [(3, (4, 8)), (3, (6, 8)), (5, (6, None)), (1, (2, None))]    #右连接  kvRDD1.rightOuterJoin(kvRDD2).collect()  [(3, (4, 8)), (3, (6, 8))]    #去除掉相同的key  kvRDD1.subtractByKey(kvRDD2).collect()  [(5, 6), (1, 2)]    kvRDD1.countByKey()  defaultdict(int, {3: 2, 5: 1, 1: 1})      #创建字典,对于Key=3的以value=6为输出  KV1=kvRDD1.collectAsMap()  {3: 6, 5: 6, 1: 2}  KV1[3]  6    kvRDD1.lookup(3)  [4, 6]复制代码
  • Python 的广播变量

    kvFruit = sc.parallelize([(1,"apple"),(2,"orange"),(3,"banana"),(4,"grape")])  FruitMap=kvFruit.collectAsMap()    print(FruitMap)    #广播  broadcastFruitMap=sc.broadcast(FruitMap)  print(broadcastFruitMap.value)  {1: 'apple', 2: 'orange', 3: 'banana', 4: 'grape'}  #取出广播  fruitIds =sc.parallelize([2,4,3,1])  fruitNames =fruitIds.map(lambda x:broadcastFruitMap.value[x]).collect()  print ("水果名称" +str(fruitNames))    水果名称['orange', 'grape', 'banana', 'apple']复制代码
  • Python 的累加器

    intRDD=sc.parallelize([1,2,3])    total=sc.accumulator(0.0)  num=sc.accumulator(0)    intRDD.foreach(lambda i:[total.add(i),num.add(1)])  avg=total.value/num.value  print (str(total.value )+" "+ str(num.value) + " "+ str(avg))    6.0 3 2.0复制代码
  • Python持久化操作

    intRDD=sc.parallelize([1,2,3])  intRDD.persist()    intRDD.is_cached     #没有执行成功  intRDD.persist(StorageLevel.MEMORY_AND_DISK)复制代码
  • python 综合案例

    textFile=sc.textFile("/LICENSE")    stringRDD = textFile.flatMap(lambda line:line.split(" ")).map(lambda x: (x,1)).reduceByKey(lambda x,y:x+y)  print(stringRDD.take(10))  stringRDD.saveAsTextFile("/pythonWordCount")    [('', 1445), ('Apache', 6), ('License', 9), ('Version', 2), ('2.0,', 1), ('January', 1), ('2004', 1), ('http://www.apache.org/licenses/', 1), ('TERMS', 2), ('AND', 3)]复制代码

3 总结

通过Python技术栈与Spark大数据数据平台整合,我们将实现python生态最完善的计算和可视化体系。

秦凯新 于深圳 201812132319

版权声明:本套技术专栏是作者(秦凯新)平时工作的总结和升华,通过从真实商业环境抽取案例进行总结和分享,并给出商业应用的调优建议和集群环境容量规划等内容,请持续关注本套博客。QQ邮箱地址:1120746959@qq.com,如有任何学术交流,可随时联系。

转载地址:http://hlvoo.baihongyu.com/

你可能感兴趣的文章
[CF521D]Shop
查看>>
Flask-在浏览器中直接显示文本文件中的内容
查看>>
ubuntu下切换默认的python版本
查看>>
《Android深度探索》(卷1)HAL与驱动开发读后感
查看>>
SQL 建立临时表和变量表
查看>>
redis
查看>>
seo——如何布局内链系统
查看>>
纯CSS3实现iOS7扁平化图标
查看>>
前端常用插件、工具类库汇总
查看>>
iOS 网络与多线程--5.异步Post方式的网络请求(非阻塞)
查看>>
python-灰色预测平均房价趋势kera深度学习库的介绍
查看>>
linux——查看系统日志错误并解决
查看>>
cuda+ffmpeg+opengl解码rtsp h264码流多路
查看>>
Android权限大全代码
查看>>
svn:previous operation has not finished
查看>>
PHP Socket 编程进阶指南
查看>>
PHP-CPP开发扩展(一)
查看>>
Git常用命令
查看>>
DL中epoch、batch等的意义【转载】
查看>>
职业规划
查看>>