= Apache Spark = [[https://spark.apache.org/docs/2.4.8/index.html|Spark 2.4.8]] is a '''MapReduce-like cluster computing framework designed for low-latency iterative jobs and interactive use from an interpreter'''. It provides clean, language-integrated APIs in Scala and Java, with a rich array of parallel operators. Spark can run on top of the Apache Mesos cluster manager, Hadoop YARN, Amazon EC2, or without an independent resource manager (“standalone mode”). [[https://spark.apache.org/docs/3.1.2/index.html|Spark 3.1.2]] Apache Spark is a '''unified analytics engine for large-scale data processing'''. It provides high-level APIs in Java, Scala, Python and R, and an optimized engine that supports general execution graphs. It also supports a rich set of higher-level tools including Spark SQL for SQL and structured data processing, MLlib for machine learning, GraphX for graph processing, and Structured Streaming for incremental computation and stream processing. == Spark components == Spark components: * Driver program * Cluster manager * Worker Node * [[https://spark.apache.org/docs/3.1.1/api/python/reference/api/pyspark.RDD.html#pyspark-rdd | Resilient distributed dataset (RDD) ]] === RDD === A Resilient Distributed Dataset (RDD), the basic abstraction in Spark. Represents an immutable, partitioned collection of elements that can be operated on in parallel. * https://spark.apache.org/docs/3.1.1/api/python/reference/api/pyspark.SparkContext.html#pyspark-sparkcontext * https://spark.apache.org/docs/3.1.1/api/python/reference/api/pyspark.RDD.html#pyspark-rdd == Install Spark == {{{#!highlight bash cd ~/tmp curl -O https://apache.claz.org/spark/spark-3.1.2/spark-3.1.2-bin-hadoop3.2.tgz tar tvzf spark-3.1.2-bin-hadoop3.2.tgz tar xvzf spark-3.1.2-bin-hadoop3.2.tgz vi ~/.bashrc export SPARK_HOME=/home/vitor/tmp/spark-3.1.2-bin-hadoop3.2 export PATH=$PATH:$SPARK_HOME/bin:$SPARK_HOME/sbin . ~/.bashrc cd ~/tmp/spark-3.1.2-bin-hadoop3.2/conf vi spark-env.sh SPARK_MASTER_HOST=127.0.0.1 cd ~/tmp start-master.sh # stop it # stop-master.sh # /home/vitor/tmp/spark-3.1.2-bin-hadoop3.2/logs/spark-vitor-org.apache.spark.deploy.master.Master-1-debian.out # 21/07/23 12:02:49 INFO Master: Starting Spark master at spark://127.0.0.1:7077 # 21/07/23 12:02:49 INFO Master: Running Spark version 3.1.2 # 21/07/23 12:02:49 WARN Utils: Service 'MasterUI' could not bind on port 8080. Attempting port 8081. # 21/07/23 12:02:49 INFO Utils: Successfully started service 'MasterUI' on port 8081. # 21/07/23 12:02:49 INFO MasterWebUI: Bound MasterWebUI to 0.0.0.0, and started at http://10.0.2.15:8081 # http://127.0.0.1:8081 # start a worker start-worker.sh spark://127.0.0.1:7077 # http://127.0.0.1:8081 # Alive Workers: 1 }}} === pyspark example === ==== Install pyspark ==== {{{#!highlight bash cd ~/tmp mkdir pyspark-test cd pyspark-test sudo apt install python3-venv python3 -m venv virtenv . virtenv/bin/activate pip3 install --upgrade setuptools pip distlib pip3 install pyspark cd ~/tmp/pyspark-test/ . virtenv/bin/activate python3 test_spark1.py }}} * Also install [[ApacheHadoop]] to have HDFS available. ==== test_spark1.py ==== {{{#!highlight python from pyspark.sql import SparkSession master_url="spark://127.0.0.1:7077" spark = SparkSession.builder.master(master_url).getOrCreate() print("spark session created") }}} ==== test_spark2.py ==== {{{#!highlight python from pyspark.sql import SparkSession from pyspark.sql.types import StringType master_url="spark://127.0.0.1:7077" spark = SparkSession.builder.master(master_url).getOrCreate() print("spark session created \n\n") sc = spark.sparkContext rdd = sc.parallelize(range(1, 4)).map(lambda x: (x, "a" * x)) rdd.saveAsSequenceFile("seqfile1") # creates folder seqfile1 locally with data print( sorted(sc.sequenceFile("seqfile1").collect()) ) data = [('First', 1), ('Second', 2), ('Third', 3), ('Fourth', 4), ('Fifth', 5)] df = spark.createDataFrame(data) # Write into HDFS # hadoop fs -ls /test df.write.mode('overwrite').csv("hdfs://master:9000/test/example.csv") textLines = ['bbbb','bbb','bb','b'] df2 = spark.createDataFrame(textLines,StringType()) df2.write.mode('overwrite').text("hdfs://master:9000/test/datab.txt") lines = sc.textFile("hdfs://master:9000/test/datab.txt") print(lines.collect()) lineLengths = lines.map(lambda line: len(line)) # returns list with length of each line totalLength = lineLengths.reduce(lambda a, b: a + b) print("total length:" + str(totalLength) ) minLength = lineLengths.reduce(lambda a, b: a if a<=b else b ) print("min length:" + str(minLength) ) maxLength = lineLengths.reduce(lambda a, b: a if a>=b else b ) print("max length:" + str(maxLength) ) }}}