Apache Spark

Spark 2.4.8 is a MapReduce-like cluster computing framework designed for low-latency iterative jobs and interactive use from an interpreter. It provides clean, language-integrated APIs in Scala and Java, with a rich array of parallel operators. Spark can run on top of the Apache Mesos cluster manager, Hadoop YARN, Amazon EC2, or without an independent resource manager (“standalone mode”).

Spark 3.1.2 Apache Spark is a unified analytics engine for large-scale data processing. It provides high-level APIs in Java, Scala, Python and R, and an optimized engine that supports general execution graphs. It also supports a rich set of higher-level tools including Spark SQL for SQL and structured data processing, MLlib for machine learning, GraphX for graph processing, and Structured Streaming for incremental computation and stream processing.

Spark components

Spark components:

RDD

A Resilient Distributed Dataset (RDD), the basic abstraction in Spark. Represents an immutable, partitioned collection of elements that can be operated on in parallel.

Install Spark

   1 cd ~/tmp 
   2 curl -O https://apache.claz.org/spark/spark-3.1.2/spark-3.1.2-bin-hadoop3.2.tgz
   3 tar tvzf spark-3.1.2-bin-hadoop3.2.tgz
   4 tar xvzf spark-3.1.2-bin-hadoop3.2.tgz
   5 vi ~/.bashrc
   6 export SPARK_HOME=/home/vitor/tmp/spark-3.1.2-bin-hadoop3.2
   7 export PATH=$PATH:$SPARK_HOME/bin:$SPARK_HOME/sbin
   8 . ~/.bashrc
   9 cd ~/tmp/spark-3.1.2-bin-hadoop3.2/conf
  10 vi spark-env.sh
  11 SPARK_MASTER_HOST=127.0.0.1
  12 cd ~/tmp
  13 start-master.sh
  14 # stop it 
  15 # stop-master.sh
  16 # /home/vitor/tmp/spark-3.1.2-bin-hadoop3.2/logs/spark-vitor-org.apache.spark.deploy.master.Master-1-debian.out
  17 # 21/07/23 12:02:49 INFO Master: Starting Spark master at spark://127.0.0.1:7077
  18 # 21/07/23 12:02:49 INFO Master: Running Spark version 3.1.2
  19 # 21/07/23 12:02:49 WARN Utils: Service 'MasterUI' could not bind on port 8080. Attempting port 8081.
  20 # 21/07/23 12:02:49 INFO Utils: Successfully started service 'MasterUI' on port 8081.
  21 # 21/07/23 12:02:49 INFO MasterWebUI: Bound MasterWebUI to 0.0.0.0, and started at http://10.0.2.15:8081
  22 # http://127.0.0.1:8081 
  23 # start a worker 
  24 start-worker.sh spark://127.0.0.1:7077
  25 # http://127.0.0.1:8081 
  26 # Alive Workers: 1
  27 

pyspark example

Install pyspark

   1 cd ~/tmp
   2 mkdir pyspark-test
   3 cd pyspark-test
   4 sudo apt install python3-venv
   5 python3 -m venv virtenv
   6 . virtenv/bin/activate
   7 pip3 install --upgrade  setuptools pip distlib
   8 pip3 install pyspark
   9 cd ~/tmp/pyspark-test/
  10 . virtenv/bin/activate
  11 python3 test_spark1.py

test_spark1.py

   1 from pyspark.sql import SparkSession
   2 master_url="spark://127.0.0.1:7077"
   3 spark = SparkSession.builder.master(master_url).getOrCreate()
   4 print("spark session created")

test_spark2.py

   1 from pyspark.sql import SparkSession
   2 from pyspark.sql.types import StringType
   3 master_url="spark://127.0.0.1:7077"
   4 spark = SparkSession.builder.master(master_url).getOrCreate()
   5 print("spark session created \n\n")
   6 sc = spark.sparkContext
   7 
   8 rdd = sc.parallelize(range(1, 4)).map(lambda x: (x, "a" * x))
   9 rdd.saveAsSequenceFile("seqfile1") # creates folder seqfile1 locally with data
  10 print( sorted(sc.sequenceFile("seqfile1").collect()) )
  11 
  12 data = [('First', 1), ('Second', 2), ('Third', 3), ('Fourth', 4), ('Fifth', 5)]
  13 df = spark.createDataFrame(data)
  14 # Write into HDFS
  15 # hadoop fs -ls /test
  16 df.write.mode('overwrite').csv("hdfs://master:9000/test/example.csv")
  17 
  18 textLines = ['bbbb','bbb','bb','b']
  19 df2 = spark.createDataFrame(textLines,StringType())
  20 df2.write.mode('overwrite').text("hdfs://master:9000/test/datab.txt")
  21 lines = sc.textFile("hdfs://master:9000/test/datab.txt")
  22 print(lines.collect())
  23 lineLengths = lines.map(lambda line: len(line)) # returns list with length of each line
  24 totalLength = lineLengths.reduce(lambda a, b: a + b)
  25 print("total length:" + str(totalLength) )
  26 minLength = lineLengths.reduce(lambda a, b: a if a<=b else b  )
  27 print("min length:" + str(minLength) )
  28 maxLength = lineLengths.reduce(lambda a, b: a if a>=b else b  )
  29 print("max length:" + str(maxLength) )

ApacheSpark (last edited 2021-07-23 15:43:54 by localhost)