版权声明:本套技术专栏是作者(秦凯新)平时工作的总结和升华,通过从真实商业环境抽取案例进行总结和分享,并给出商业应用的调优建议和集群环境容量规划等内容,请持续关注本套博客。QQ邮箱地址:1120746959@qq.com,如有任何学术交流,可随时联系。
1 Python技术栈与Spark大数据数据平台整合
-
下载Anaconda3 Linux版本
Anaconda3-5.3.1-Linux-x86_64.sh复制代码
-
安装Anaconda3
bash Anaconda3-5.3.1-Linux-x86_64.sh -b 复制代码
-
环境变量配置PYSPARK_DRIVER_PYTHON以及PYSPARK_PYTHON配置
export SCALA_HOME=/usr/local/install/scala-2.11.8 export JAVA_HOME=/usr/lib/java/jdk1.8.0_45 export HADOOP_HOME=/usr/local/install/hadoop-2.7.3 export SPARK_HOME=/usr/local/install/spark-2.3.0-bin-hadoop2.7 export FLINK_HOME=/usr/local/install/flink-1.6.1 export ANACONDA_PATH=/root/anaconda3 export PYSPARK_DRIVER_PYTHON=$ANACONDA_PATH/bin/ipython export PYSPARK_PYTHON=$ANACONDA_PATH/bin/python export JRE_HOME=${JAVA_HOME}/jre export CLASS_PATH=.:${JAVA_HOME}/lib:${JRE_HOME}/lib export PATH=:${JAVA_HOME}/bin:${SCALA_HOME}/bin:${HADOOP_HOME}/bin:${SPARK_HOME}/bin:$PATH export PATH=/root/anaconda3/bin:$PATH复制代码
-
启动Saprk
-
启动jupyter notebook
老版本 PYSPARK_DRIVER_PYTHON=ipython PYSPARK_DRIVER_PYTHON_OPTS="notebook --allow-root" pyspark 未来版本 PYSPARK_DRIVER_PYTHON=ipython PYSPARK_DRIVER_PYTHON_OPTS=`jupyter notebook --allow-root` pyspark复制代码
-
jupyter远程访问
jupyter notebook --generate-config vi ~/.jupyter/jupyter_notebook_config.py c.NotebookApp.ip = '*' # 允许访问此服务器的 IP,星号表示任意 IP c.NotebookApp.open_browser = False # 运行时不打开本机浏览器 c.NotebookApp.port = 12035 # 使用的端口,随意设置 c.NotebookApp.enable_mathjax = True # 启用 MathJax c.NotebookApp.allow_remote_access = True复制代码
-
jupyter NoteBook开发界面
- spark程序调试
lines=sc.textFile("/LICENSE") pairs = lines.map(lambda s: (s, 1)) counts = pairs.reduceByKey(lambda a, b: a + b) counts.count() 243 counts.first() (' Apache License', 1)复制代码
-
Standalone模式启动
PYSPARK_DRIVER_PYTHON=ipython PYSPARK_DRIVER_PYTHON_OPTS="notebook --allow-root" MASTER=spark://SparkMaster:7077 pyspark复制代码
2 Spark转换运算
2.1 scala 操作
val intRDD=sc.parallelize(List(1,2,3)) intRDD.collect Array[Int] = Array(1, 2, 3)复制代码
2.2 python 操作
-
python基础RDD操作
#parallelize intRDD=sc.parallelize([1,2,3]) intRDD.collect() [1, 2, 3] StringRDD=sc.parallelize(["Apple","Orange"]) StringRDD.collect() ['Apple', 'Orange'] #具名函数 def addOne(x): return x+1 intRDD.map(addOne).collect() #匿名函数 intRDD=sc.parallelize([1,2,3]) intRDD.map(lambda x:x+1).collect() [2, 3, 4] #过滤器 intRDD.filter(lambda x:1< x and x<5).collect() [2, 3] #in stringRDD =sc.parallelize(["apple","blue"]) stringRDD.filter(lambda x:"apple" in x).collect() ['apple'] #distinct intRDD=sc.parallelize([1,2,3,2,7]) intRDD.distinct().collect() [1, 2, 3, 7] #randomSplit sRDD=intRDD.randomSplit([0.4,0.6]) sRDD[0].collect() [1, 2] #groupBy group=intRDD.groupBy(lambda x:"even" if(x%2==0) else "odd").collect() print(group) [('odd',
), ('even', )] print (sorted(group[0][1])) [1, 3, 7] print (sorted(group[1][1])) [2, 2]复制代码 -
python多个RDD转换操作
intRDD1=sc.parallelize(["apple","blue"]) intRDD2=sc.parallelize([1,2]) intRDD3=sc.parallelize(["apple","blue"]) #合并运算 intRDD1.union(intRDD2).union(intRDD3).collect() ['apple', 'blue', 1, 2, 'apple', 'blue'] #交集运算 intRDD1=sc.parallelize([3,1,2,5,5]) intRDD2=sc.parallelize([5,6]) intRDD3=sc.parallelize([2,7]) intRDD1.intersection(intRDD2).collect() [5] intRDD1=sc.parallelize([3,1,2,5,5]) intRDD2=sc.parallelize([5,6]) intRDD3=sc.parallelize([2,7]) intRDD1.subtract(intRDD2).collect() [2, 3, 1] intRDD1.first() intRDD1.take(3) intRDD1.takeOrdered(3) [1, 2, 3] intRDD1.takeOrdered(3,lambda x:-x) [5, 5, 3]复制代码
-
Python RDD基于Key-Value转换
kvRDD1=sc.parallelize([(3,4),[3,6],[5,6],[1,2]]) kvRDD1.collect() [(3, 4), [3, 6], [5, 6], [1, 2]] kvRDD1.keys().collect() [3, 3, 5, 1] kvRDD1.values().collect() [4, 6, 6, 2] kvRDD1.filter(lambda keyvalue :keyvalue[0]<5).collect() [(3, 4), [3, 6], [1, 2]] kvRDD1.mapValues(lambda x:x*x).collect() [(3, 16), (3, 36), (5, 36), (1, 4)] kvRDD1.sortByKey(ascending=False).collect() [[1, 2], (3, 4), [3, 6], [5, 6]] kvRDD1.reduceByKey(lambda x,y:x+y).collect() [(3, 10), (5, 6), (1, 2)]复制代码
-
Python 多个RDD 转换操作
#join kvRDD1=sc.parallelize([(3,4),[3,6],[5,6],[1,2]]) kvRDD2=sc.parallelize([(3,8)]) kvRDD1.join(kvRDD2).collect() [(3, (4, 8)), (3, (6, 8))] #左连接 kvRDD1.leftOuterJoin(kvRDD2).collect() [(3, (4, 8)), (3, (6, 8)), (5, (6, None)), (1, (2, None))] #右连接 kvRDD1.rightOuterJoin(kvRDD2).collect() [(3, (4, 8)), (3, (6, 8))] #去除掉相同的key kvRDD1.subtractByKey(kvRDD2).collect() [(5, 6), (1, 2)] kvRDD1.countByKey() defaultdict(int, {3: 2, 5: 1, 1: 1}) #创建字典,对于Key=3的以value=6为输出 KV1=kvRDD1.collectAsMap() {3: 6, 5: 6, 1: 2} KV1[3] 6 kvRDD1.lookup(3) [4, 6]复制代码
-
Python 的广播变量
kvFruit = sc.parallelize([(1,"apple"),(2,"orange"),(3,"banana"),(4,"grape")]) FruitMap=kvFruit.collectAsMap() print(FruitMap) #广播 broadcastFruitMap=sc.broadcast(FruitMap) print(broadcastFruitMap.value) {1: 'apple', 2: 'orange', 3: 'banana', 4: 'grape'} #取出广播 fruitIds =sc.parallelize([2,4,3,1]) fruitNames =fruitIds.map(lambda x:broadcastFruitMap.value[x]).collect() print ("水果名称" +str(fruitNames)) 水果名称['orange', 'grape', 'banana', 'apple']复制代码
-
Python 的累加器
intRDD=sc.parallelize([1,2,3]) total=sc.accumulator(0.0) num=sc.accumulator(0) intRDD.foreach(lambda i:[total.add(i),num.add(1)]) avg=total.value/num.value print (str(total.value )+" "+ str(num.value) + " "+ str(avg)) 6.0 3 2.0复制代码
-
Python持久化操作
intRDD=sc.parallelize([1,2,3]) intRDD.persist() intRDD.is_cached #没有执行成功 intRDD.persist(StorageLevel.MEMORY_AND_DISK)复制代码
-
python 综合案例
textFile=sc.textFile("/LICENSE") stringRDD = textFile.flatMap(lambda line:line.split(" ")).map(lambda x: (x,1)).reduceByKey(lambda x,y:x+y) print(stringRDD.take(10)) stringRDD.saveAsTextFile("/pythonWordCount") [('', 1445), ('Apache', 6), ('License', 9), ('Version', 2), ('2.0,', 1), ('January', 1), ('2004', 1), ('http://www.apache.org/licenses/', 1), ('TERMS', 2), ('AND', 3)]复制代码
3 总结
通过Python技术栈与Spark大数据数据平台整合,我们将实现python生态最完善的计算和可视化体系。
秦凯新 于深圳 201812132319
版权声明:本套技术专栏是作者(秦凯新)平时工作的总结和升华,通过从真实商业环境抽取案例进行总结和分享,并给出商业应用的调优建议和集群环境容量规划等内容,请持续关注本套博客。QQ邮箱地址:1120746959@qq.com,如有任何学术交流,可随时联系。