COMP9313: Big Data Management Revisit
MyExperience
v “Please participate in the myExperience Survey and take the opportunity to share your constructive thoughts on your learning experience. Your contributions help your teachers and shape the future of education at UNSW.”
v You can access the survey by logging into Moodle or accessing myexperience.unsw.edu.au directly.
v The deadline of myExperience is 2023-11-23.
v As mentioned in WebCMS3 notice, if the response rate from the class is more than 50%, everybody gets 1 bonus mark added to the final mark
Final exam
v Final written exam (50 pts)
v Six questions in total on five topics
v Four hours (Do not wait for the last minute to submit!) v Online exam. Submit through Moodle
v If you are ill on the day of the exam, do not attend the exam – will not accept any medical special consideration claims from people who already attempted the exam.
Chapters Required in Exam
v Hadoop MapReduce (Chapters 1, 2, and 3) Ø HDFS
Ø MapReduce Concepts and Mechanism
Ø MapReduce algorithm design v Spark (Chapters 4 and 5)
Ø DataFrame
v Mining Data Streams (Chapter 6) v Finding Similar Items (Chapter 7)
Ø Shingling, minhash, LSH
Ø Exact solution
v Graph Data Management (Chapter 8) v NoSQL and Hive (Chapter 9)
Exam Questions
v Question 1: HDFS, MapReduce, and Spark concepts
v Question 2: MapReduce algorithm design (pseudo-code only) v Question 3: Spark algorithm design
Ø DataFrame
v Question 4 Finding Similar Items
v Question 5 Mining Data Streams
v Question 6 Graph Data Management
Question 0
v (a) (2 marks) Explain the data flow in MapReduce using the word count problem as an example.
v (b) (2 marks) Explain the data flow in Spark using the word count problem as an example.
Map and Reduce Functions
v Programmers specify two functions: Ø map (k1, v1) → list [
4 Map transforms the input into key-value pairs to process Ø reduce (k2, [v2]) → [
4 Reduce aggregates the list of values for each key
4 All values with the same key are sent to the same reducer v Optionally, also:
Ø combine (k2, [v2]) → [
Ø partition (k2, number of partitions) → partition for k2
Ø Grouping comparator: controls which keys are grouped together for a single call to Reducer.reduce() function
v The execution framework handles everything else…
MapReduce Data Flow
Sample Questions
v Assume that you are given a data set crawled from a location-based social network, in which each line of the data is in format of (userID, a list of locations the user has visited
程序代写 CS代考 加微信: cstutorcs
class Question1
method map(self, userID, list of locations)
foreach loc in the list of locations Emit(“loc, userID”, “”)
method reduce_init(self) current_loc = “”
current_list = []
method reduce(self, key, value) loc, userID = key.split(“,”)
if loc != current_loc
if current_loc!=“”
Emit(current_loc, current_list) current_list = []
current_list.add(userID)
current_loc=loc else
current_list.add(userID)
method reduce_final(self) Emit(current_loc, current_list)
In JOBCONF, configure: ‘mapreduce.map.output.key.field.separator’:’,’, ‘mapreduce.partition.keypartitioner.options’:’-k1,1’, ‘mapreduce.partition.keycomparator.options’:’-k1,1 -k2,2′
Sample Questions
v Given a table shown as below, find out the person(s) with the maximum salary in each department (employees could have the same salary).
EmployeeID
DepartmentID
v Solution:
Ø Mapper: for each record, Emit(department + “,” + salary, name)
Ø Combiner: find out all persons with the local maximum salary for each department
Ø Reducer: receives data ordered by (department, salary), the first one is the maximum salary in a department. Check the next one until reaching a smaller salary and ignore all remaining. Save all persons with this maximum salary in the department
Ø JOBCONF: key partitioned by “-k1,1”, sorted by “-k1,1 -k2,2n” 11
Sample Questions
v Given a large text dataset, find the top-k frequent terms (considering that you can utilize multiple reducers, and the efficiency of your method is evaluated).
v Solution:
Ø Two rounds:
4 First round compute term frequency in multiple reducers, and each reducer only stores local top-k.
4 Second round get the local top-k and compute the final top-k using a single reducer.
What is RDD
v Resilient Distributed Datasets: A Fault-Tolerant Abstraction for In- Memory Cluster Computing. Matei Zaharia, et al. NSDI’12
Ø RDD is a distributed memory abstraction that lets programmers perform in-memory computations on large clusters in a fault- tolerant manner.
v Resilient
Ø Fault-tolerant, is able to recompute missing or damaged partitions
due to node failures.
v Distributed
Ø Data residing on multiple nodes in a cluster.
Ø A collection of partitioned elements, e.g. tuples or other objects
(that represent records of the data you work with).
v RDD is the primary data abstraction in Apache Spark and the core of Spark. It enables operations on collection of elements in parallel.
CS Help, Email: tutorcs@163.com
RDD Operations
v Transformation: returns a new RDD.
Ø Nothing gets evaluated when you call a Transformation function, it
just takes an RDD and return a new RDD.
Ø Transformation functions include map, filter, flatMap, groupByKey,
reduceByKey, aggregateByKey, filter, join, etc.
v Action: evaluates and returns a new value.
Ø When an Action function is called on a RDD object, all the data processing queries are computed at that time and the result value is returned.
Ø Action operations include reduce, collect, count, first, take, countByKey, foreach, saveAsTextFile, etc.
v DataFrame more like a traditional database of two-dimensional form, in addition to data, but also to grasp the structural information of the data, that is, schema
Ø RDD[Person] although with Person for type parameters, but the Spark framework itself does not understand internal structure of Person class
Ø DataFrame has provided a detailed structural information, making Spark SQL can clearly know what columns are included in the dataset, and what is the name and type of each column. Thus,
Spark SQL query optimizer can target optimization
Sample Questions
v RDD: Given a large text file, your task is to find out the top-k most frequent co-occurring term pairs. The co-occurrence of (w, u) is defined as: u and w appear in the same line (this also means that (w, u) and (u, w) are treated equally). Your Spark program should generate a list of k key-value pairs ranked in descending order according to the frequencies, where the keys are the pair of terms and the values are the co-occurring frequencies (Hint: you need to define a function which takes an array of terms as input and generate all possible pairs).
val textFile = sc.textFile(inputFile)
val words = textFile.map(_.split(“ “).toLowerCase)
// fill your code here, and store the result in a pair RDD topk topk.foreach(x => println(x._1, x._2))
Sample Questions
v Given a set of marks from different courses (the input format is as shown in the left column), the task is to: compute average marks for every course and sort the result by course_name in alphabetical order.
student1:course1,90;course2,92;course3,80;course4, 79;course5,93 student2:course1,92;course2,77;course5,85 student3:course3,64;course4,97;course5,82
course1:91 course2:84.5 course3:72 course4:88 course5:86.67
v Solution:
fileDF = spark.read.text(“file:///home/comp9313/tinydoc”)
student = fileDF.select(split(fileDF[‘value’], ‘:’).getItem(0).alias(‘sid’), split(fileDF[‘value’], ‘:’).getItem(1).alias(‘courses’))
scDF = student.withColumn(‘course’, explode(split(‘courses’, ‘;’)))
scDF2 = scDF.select(split(scDF[‘course’], ‘,’).getItem(0).alias(‘cname’), split(scDF[‘course’], ‘,’).getItem(1).alias(‘mark’))
avgDF = scDF2.groupBy(‘cname’).agg(avg(‘mark’)).orderBy(‘cname’)
Code Help, Add WeChat: cstutorcs
Mining Data Streams
v Sampling from a data stream
v Sliding window – counting bits (DGIM) 1001010110001011010101010101011010101010101110101010111010100010110010
v Filtering data stream – (counting) bloom filter xi xj
R FMBITMAP 1
Mining Data Streams
v Finding Frequent Elements
Ø Boyer-Moore voting algorithm, Misra-Gries algorithm Ø lossy counting, count-min sketch
v Counting data stream – FM-Sketch
Ø Estimate d = c2R for scaling constant c ≈ 1.3 (original paper)
position ≫ log(d)
fringe of 0/1s around log(d)
position ≪ log(d)
Sample Questions
v Use an example to explain the reservoir sampling algorithm
Ø Store all the first s elements of the stream to S
Ø Suppose we have seen n-1 elements, and now the nth element arrives (n >
ü With probability s/n, keep the nth element, else discard it
ü If we picked the nth element, then it replaces one of the s elements in
the sample S, picked uniformly at random
Sample Questions
Suppose we are maintaining a count of 1s using the DGIM method. We represent a bucket by (i, t), where i is the number of 1s in the bucket and t is the bucket timestamp (time of the most recent 1).
Consider that the current time is 200, window size is 60, and the current list of buckets is: (16, 148) (8, 162) (8, 177) (4, 183) (2, 192) (1, 197) (1, 200). At the next ten clocks, 201 through 210, the stream has 0101010101. What will the sequence of buckets be at the end of these ten inputs?
Sample Solution
v There are 5 1s in the stream. Each one will update to windows to be:
Ø (1) (16, 148)(8, 162)(8, 177)(4, 183)(2, 192)(1, 197)(1, 200), (1, 202)
=> (16, 148)(8, 162)(8, 177)(4, 183)(2, 192)(2, 200), (1, 202)
Ø (2) (16, 148)(8, 162)(8, 177)(4, 183)(2, 192)(2, 200), (1, 202), (1, 204)
Ø (3) (16, 148)(8, 162)(8, 177)(4, 183)(2, 192)(2, 200), (1, 202), (1, 204), (1; 206)
=> (16, 148)(8, 162)(8, 177)(4, 183)(2, 192)(2, 200), (2, 204), (1, 206)
=> (16, 148)(8, 162)(8, 177)(4, 183)(4, 200), (2, 204), (1, 206)
Ø (4) Windows Size is 60, so (16,148) should be dropped.
(16, 148)(8, 162)(8, 177)(4, 183)(4, 200), (2, 204), (1, 206), (1, 208) =>
(8, 162)(8, 177)(4, 183)(4, 200), (2, 204), (1, 206), (1, 208)
Ø (5) (8, 162)(8, 177)(4, 183)(4, 200), (2, 204), (1, 206), (1, 208), (1, 210) => (8, 162)(8, 177)(4, 183)(4, 200), (2, 204), (2, 208), (1, 210)
Sample Questions
v Consider a Bloom filter of size m = 7 (i.e., 7 bits) and 2 hash functions that both take a string (lowercase) as input:
h1(str) = ∑(c in str)(c-‘a’) mod 7
h2(str) = str.length mod 7
Here, c – ‘a’ is used to compute the position of the letter c in the 26 alphabetical letters, e.g., h1(“bd”) = (1 + 3) mod 7 = 4.
Ø (i) Given a set of string S = {“hi”, “big”, “data”}, show the update of the Bloom filter
Ø (ii) Given a string “spark”, use the Bloom filter to check whether it is contained in S.
Ø (iii) Given S in (i) and the Bloom filter with 7 bits, what is the percentage of the false positive probability (a correct expression is sufficient: you need not give the actual number)?
Sample Solution
v (ii)h1(spark)=(18+15+0+17+10)mod7=4 h2 (spark) = 5 mod 7 = 5
Not in S since the 4th bit is 1 but the 5th bit is 0
v (iii) k – # of hash functions; m – # of inserting elements; n – # of bits
(𝟏 − 𝒆!𝒌𝒎)𝒌= 𝟎. 𝟑𝟑𝟏𝟑
(7+8) mod 7 = 1
(1+8+6) mod 7 = 1
(3+0+19+0) mod 7 = 1
2 mod 7 = 2
3 mod 7 = 3
4 mod 7 = 4
Sample Questions
v Assume that we have 5 buckets and three hash functions: Ø h0(str) = str.length * 2 mod 5
Ø h1(str) = str.length mod 5
Ø h2(str) = (str[0]-‘a’) mod 5
Given you a stream of terms: “big”, “data”, “data”, “set”, “data”, “analytics”, show the steps of building the CM-Sketch. Then, use the built CM-sketch to get the count for word “data”.
v Solution:
Ø big:h0=1,h1=3,h2=1
Ø data:h0=3,h1=4,h2=3
Ø set:h0=1,h1=3,h2=3
Ø analytics:h0=3,h1=4,h2=0
Sample Solution
Initially:
big: data:
data: analytics:
Min(CMS[0][3], CMS[1][4], CMS[2][3])=4, which is not the correct count.
Finding Similar Items
Docu- ment
Min Hashing
Candidate pairs:
those pairs of signatures that we need to test for similarity
v The Big Picture
of strings of length k that appear in the doc- ument
Signatures: short integer vectors that represent the sets, and reflect their similarity
Locality- Sensitive Hashing
Sample Questions
v MinHash:
We want to compute min-hash signature for two columns, C1 and C2 using two pseudo-random permutations of columns using the following function:
h1(n) = 3n + 2 mod 7 h2(n) = 2n – 1 mod 7
Here, n is the row number in original ordering. Instead of explicitly reordering the columns for each hash function, we use the implementation discussed in class, in which we read each data in a column once in a sequential order, and update the min hash signatures as we pass through them.
Complete the steps of the algorithm and give the resulting signatures for C1 and C2.
h1(n) = 3n + 2 mod 7 h2(n) = 2n – 1 mod 7
h1(0) = h2(0) =
h1(1) = h2(1) =
h1(2) = h2(2) =
h1(4) = h2(4) =
2 ∞ 2 6 ∞ 6
5 5 2 1 1 6
1 5 1 3 1 3
0 0 0 0 0 0
Sample Questions
v Suppose we wish to find similar sets, and we do so by minhashing the sets 10 times and then applying locality-sensitive hashing using 5 bands of 2 rows (minhash values) each. If two sets had Jaccard similarity 0.6, what is the probability that they will be identified in the locality-sensitive hashing as candidates (i.e. they hash at least once to the same bucket)? You may assume that there are no coincidences, where two unequal values hash to the same bucket. A correct expression is sufficient: you need not give the actual number.
v Solution: 1 – (1 – tr)b Ø 1-(1–0.62)5
Reads –>0|n1:10,n2:5
Emit: (n1, 10), (n2, 5), and the adjacency list (s, n1: 10, n2: 5)
The other lists will also be read and emit, but they do not contribute, and thus ignored
Receives: (n1, 10), (n2, 5), (s, <0, (n1: 10, n2: 5)>)
The adjacency list of each node will also be received, ignored in example Emit:
s –>0|n1:10,n2:5
n1 –> 10 | n2: 2, n3:1
n2 –> 5 | n1: 3, n3:9, n4:2
path (iteration 1)
PageRank in MapReduce (One Iteration)
n1 [n2, n4] n2 [n3, n5] n3 [n4] n4 [n5] n5 [n1, n2, n3]
n4n3 n4 n1n3
n1 n3 n3 n4 n4 Reduce
n1 [n2, n4] n2 [n3, n5] n3 [n4]
n4 [n5] n5 [n1, n2, n3]
Sample Questions
v A directed graph G has the set of nodes {1,2,3,4,5,6} with the edges arranged as follows.
v Set up the PageRank equations, assuming β = 0.8 (jump probability = 1- β). Denote the PageRank of node a by r(a).
Thank you!