Apache Spark
Spark 2.4.8 is a MapReduce-like cluster computing framework designed for low-latency iterative jobs and interactive use from an interpreter. It provides clean, language-integrated APIs in Scala and Java, with a rich array of parallel operators. Spark can run on top of the Apache Mesos cluster manager, Hadoop YARN, Amazon EC2, or without an independent resource manager (“standalone mode”).
Spark 3.1.2 Apache Spark is a unified analytics engine for large-scale data processing. It provides high-level APIs in Java, Scala, Python and R, and an optimized engine that supports general execution graphs. It also supports a rich set of higher-level tools including Spark SQL for SQL and structured data processing, MLlib for machine learning, GraphX for graph processing, and Structured Streaming for incremental computation and stream processing.
Spark components
Spark components:
- Driver program
- Cluster manager
- Worker Node
RDD
A Resilient Distributed Dataset (RDD), the basic abstraction in Spark. Represents an immutable, partitioned collection of elements that can be operated on in parallel.
Install Spark
1 cd ~/tmp
2 curl -O https://apache.claz.org/spark/spark-3.1.2/spark-3.1.2-bin-hadoop3.2.tgz
3 tar tvzf spark-3.1.2-bin-hadoop3.2.tgz
4 tar xvzf spark-3.1.2-bin-hadoop3.2.tgz
5 vi ~/.bashrc
6 export SPARK_HOME=/home/vitor/tmp/spark-3.1.2-bin-hadoop3.2
7 export PATH=$PATH:$SPARK_HOME/bin:$SPARK_HOME/sbin
8 . ~/.bashrc
9 cd ~/tmp/spark-3.1.2-bin-hadoop3.2/conf
10 vi spark-env.sh
11 SPARK_MASTER_HOST=127.0.0.1
12 cd ~/tmp
13 start-master.sh
14 # stop it
15 # stop-master.sh
16 # /home/vitor/tmp/spark-3.1.2-bin-hadoop3.2/logs/spark-vitor-org.apache.spark.deploy.master.Master-1-debian.out
17 # 21/07/23 12:02:49 INFO Master: Starting Spark master at spark://127.0.0.1:7077
18 # 21/07/23 12:02:49 INFO Master: Running Spark version 3.1.2
19 # 21/07/23 12:02:49 WARN Utils: Service 'MasterUI' could not bind on port 8080. Attempting port 8081.
20 # 21/07/23 12:02:49 INFO Utils: Successfully started service 'MasterUI' on port 8081.
21 # 21/07/23 12:02:49 INFO MasterWebUI: Bound MasterWebUI to 0.0.0.0, and started at http://10.0.2.15:8081
22 # http://127.0.0.1:8081
23 # start a worker
24 start-worker.sh spark://127.0.0.1:7077
25 # http://127.0.0.1:8081
26 # Alive Workers: 1
27
pyspark example
Install pyspark
Also install ApacheHadoop to have HDFS available.
test_spark1.py
test_spark2.py
1 from pyspark.sql import SparkSession
2 from pyspark.sql.types import StringType
3 master_url="spark://127.0.0.1:7077"
4 spark = SparkSession.builder.master(master_url).getOrCreate()
5 print("spark session created \n\n")
6 sc = spark.sparkContext
7
8 rdd = sc.parallelize(range(1, 4)).map(lambda x: (x, "a" * x))
9 rdd.saveAsSequenceFile("seqfile1") # creates folder seqfile1 locally with data
10 print( sorted(sc.sequenceFile("seqfile1").collect()) )
11
12 data = [('First', 1), ('Second', 2), ('Third', 3), ('Fourth', 4), ('Fifth', 5)]
13 df = spark.createDataFrame(data)
14 # Write into HDFS
15 # hadoop fs -ls /test
16 df.write.mode('overwrite').csv("hdfs://master:9000/test/example.csv")
17
18 textLines = ['bbbb','bbb','bb','b']
19 df2 = spark.createDataFrame(textLines,StringType())
20 df2.write.mode('overwrite').text("hdfs://master:9000/test/datab.txt")
21 lines = sc.textFile("hdfs://master:9000/test/datab.txt")
22 print(lines.collect())
23 lineLengths = lines.map(lambda line: len(line)) # returns list with length of each line
24 totalLength = lineLengths.reduce(lambda a, b: a + b)
25 print("total length:" + str(totalLength) )
26 minLength = lineLengths.reduce(lambda a, b: a if a<=b else b )
27 print("min length:" + str(minLength) )
28 maxLength = lineLengths.reduce(lambda a, b: a if a>=b else b )
29 print("max length:" + str(maxLength) )