Parallel Programming With Spark
Improves efficiency through:
» In4memory computing primitives
»General computation graphs
Spark originally written in Scala, which allows
concise function syntax and interactive use
Recently added Java API for standalone apps
Concept: resilient distributed datasets (RDDs)
» Immutable collections of objects spread across a cluster
» Built through parallel transformations (map, filter, etc)
» Automatically rebuilt on failure
» Controllable persistence (e.g. caching in RAM) for reuse
Main Primitives
Resilient distributed datasets (RDDs)
» Immutable, partitioned collections of objects
Transformations (e.g. map, filter, groupBy, join)
» Lazy operations to build RDDs from other RDDs
Actions (e.g. count, collect, save)
»Return a result or write it to storage
RDD Fault Tolerance
RDDs track the series of transformations used to
build them (their lineage) to recompute lost data
Easiest way: Spark interpreter (spark-shell)
» Modified version of Scala interpreter for cluster use
Runs in local mode on 1 thread by default, but
can control through MASTER environment var:
MASTER=local ./spark-shell # local, 1 thread
MASTER=local[2] ./spark-shell # local, 2 threads
MASTER=host:port ./spark-shell # run on Mesos
First Stop: SparkContext
Main entry point to Spark functionality
Created for you in spark-shell as variable sc
Creating RDDs
// Turn a Scala collection into an RDD
sc.parallelize(List(1, 2, 3))
// Load text file from local FS, HDFS, or S3
sc.textFile("file.txt")
sc.textFile("directory/*.txt")
sc.textFile("hdfs://namenode:9000/path/file")
// Use any existing Hadoop InputFormat
sc.hadoopFile(keyClass, valClass, inputFmt, conf)
val nums = sc.parallelize(List(1, 2, 3))
// Pass each element through a function
val squares = nums.map(x => x*x) // {1, 4, 9}
// Keep elements passing a predicate
val even = squares.filter(_ % 2 == 0) // {4}
// Map each element to zero or more others
nums.flatMap(x => 1 to x) // => {1, 1, 2, 1, 2, 3}
Working with KeyUValue Pairs
Spark’s "distributed reduce" transformations
operate on RDDs of key4value pairs
Scala pair syntax:
val pair = (a, b) // sugar for new Tuple2(a, b)
Accessing pair elements:
pair._1 // => a
pair._2 // => b
Some KeyUValue Operations
val pets = sc.parallelize( List(("cat", 1), ("dog", 1), ("cat", 2)))
pets.reduceByKey(_ + _) // => {(cat, 3), (dog, 1)}
pets.groupByKey() // => {(cat, Seq(1, 2)), (dog, Seq(1)}
pets.sortByKey() // => {(cat, 1), (cat, 2), (dog, 1)}
reduceByKey also automatically implements
combiners on the map side
val lines = sc.textFile("hamlet.txt")
val counts = lines.flatMap(line => line.split(" "))
.map(word => (word, 1))
.reduceByKey(_ + _)
val visits = sc.parallelize(List(
("index.html"
, "1.2.3.4"),
("about.html"
, "3.4.5.6"),
("index.html"
, "1.3.3.1")))
val pageNames = sc.parallelize(List(
("index.html"
, "Home"), ("about.html"
, "About")))
visits.join(pageNames)
// ("index.html", ("1.2.3.4", "Home"))
// ("index.html", ("1.3.3.1", "Home"))
// ("about.html", ("3.4.5.6", "About"))
visits.cogroup(pageNames)
// ("index.html", (Seq("1.2.3.4", "1.3.3.1"), Seq("Home")))
// ("about.html", (Seq("3.4.5.6"), Seq("About")))
Controlling The Number of
Reduce Tasks
All the pair RDD operations take an optional
second parameter for number of tasks
words.reduceByKey(_ + _, 5)
words.groupByKey(5)
visits.join(pageViews, 5)
Can also set spark.default.parallelism property
Using Local Variables
Any external variables you use in a closure will
automatically be shipped to the cluster:
val query = Console.readLine()
pages.filter(_.contains(query)).count()
Some caveats:
» Each task gets a new copy (updates aren’t sent back)
»Variable must be Serializable
»Don’t use fields of an outer object (ships all of it!)
sample(): deterministically sample a subset
union(): merge two RDDs
cartesian(): cross product
pipe(): pass through external program
Task Scheduler
Runs general task
graphs
Pipelines functions
where possible
Cache4aware data
reuse locality
Partitioning4aware
to avoid shuffles
Please read full article from
Parallel Programming With Spark