The word "graph" usually evokes the kind of plots that we've all learned about in grade school mathematics. In fact, one has likely plotted simple lines and curves using "graphing paper" or a "graphing calculator" before. The word "graph" can also describe a ubiquitous data structure consisting of edges connecting a set of vertices. This type of graph can be used to describe many different phenomena such as social networks, recommendation engines, transportation systems, and more.

Perhaps the most famous application of graphs is Google's PageRank algorithm which revolutionized search engine technology. Has a social network ever suggested "friend of a friend" connections to you? Your social network connections are likely modeled using a graph. Has anyone ever told you what your Kevin Bacon Degree of Separation is? They probably made it up and did not use a graph.

Graphs also provide us with a powerful computational paradigm. Graph databases can efficiently perform operations that would require a long series of joins in a traditional relational database. Classical problems in statistics and machine learning can be solved as graph problems. For example, undirected graphical models can be used to estimate joint distributions of random variables. Unsupervised learning methods such as spectral clustering can be viewed as a graph partitioning problem.

## Terminology

Like any discipline, graphs come with their own set of nomenclature. We will discuss a small subset required for the rest of the article. The following descriptions are intentionally simplified—more mathematically rigorous definitions can be found in any graph theory textbook.

• Graph — A data structure G = (V, E) where V and E are a set of vertices and edges.
• Vertex — Represents a single entity such as a person or an object (e.g., a username on a social network).
• Edge — Represents a relationship between two vertices (e.g., are these two vertices friends on a social network?).
• Directed Graph vs. Undirected Graph — Denotes whether the relationship represented by edges is symmetric or not (e.g., Twitter user A can follow user B but B might not follow A).
• Subgraph — A set of vertices and edges that are a subset of the full graph's vertices and edges.
• Degree — A vertex measurement quantifying the number of connected edges (e.g., a username vertex on Facebook has a degree of 50 if it has a direct friend relationship with 50 other users).
• Connected Component — A strongly connected subgraph, meaning that every vertex can reach the other vertices in the subgraph.
• Shortest Path — The fewest number of edges required to travel between two specific vertices.

Using the above figure:

• This is an undirected graph.
• Vertex B has a degree of 4, vertex I has a degree of 2, and vertex H has a degree of 1.
• The shortest path between A and G is 3 {A-B-E-G}.
• The vertices {E, F, G} and the edges connecting them are considered a subgraph.
• The graph has 2 connected components, {A,B,C,D,E,F,G} and {H,I,J}.

Graph vertices and edges typically possess properties. For example, a username vertex might carry a "last login date" property. An edge signifying a social network friendship between users might have a "date connected" property. Most graph databases allow multiple edge types between vertices, signifying different types of relationships.

In this article, we will keep the examples simple. Vertices and edges will have no properties and the edges themselves will signify a generic undirected connection. The resulting graph computations can easily be extended to more specific use cases.

## Why Apache Spark?

Data workflows usually require the creation of many intermediate or temporary datasets. In a traditional relational database, this typically results in the following problems:

• Unless the data scientist or engineer is being careful, the database can become littered with many temporary or ad-hoc tables that should not be retained. The naming of these tables isn't always obvious—figuring out what can be deleted becomes an arduous or sometimes futile team exercise.
• If subqueries are used in lieu of temporary tables, the SQL can become difficult to read and understand (even if good formatting practices are followed by the practitioner).

SQL logic is often kept in stored procedures, existing in the database object store separately from the orchestrating code. This separation can paint a confusing portrait of the full data workflow.

Apache Spark solves these problems by allowing SQL-like operations to exist alongside the calling logic. Spark is a beautiful convalesence of traditional SQL and imperative (or functional) programming paradigms. Temporary datasets and results can be represented and captured symbolically as variables. Computations are executed fully in-memory (unless the programmer intentionally spills to disk). The result is a scalable, organized, and readable data workflow.

There are many graph database options on the market with varying levels of scalability. Many modern and popular relational databases contain the ability to store graph data and query it using a standard graph query language. Graph databases like these are useful for expressively querying, visualizing, and exploring graph data.

While graph computations are important, they are often only a small part of the big data pipeline. At Oracle Data Cloud, we use Spark to process graphs with tens of billions of edges and vertices. However, terabyte-scale ETL is still required before any data science or graph algorithms are executed. Spark gives us a single platform to efficiently process the data and apply both machine learning and graph algorithms.

Data scientists often debate on whether to write Spark in Python or Scala. I prefer the Scala version due to the strong typing and the ability to catch errors at compile time. Catching errors before submitting a Spark application avoids spending expensive cluster compute cycles on failed jobs.

## Tutorial Setup

We will use a Jupyter Scala notebook in this tutorial. The directions here are intended for macOS.

• Step 1: Follow the instructions here to install the Scala kernel for Jupyter notebooks.

• Step 2: If you receive an error like "java.net.BindException: Can't assign requested address: Service 'sparkDriver' failed", follow these directions.

• Step 3: Run the code below to install Spark dependencies.

// Add spark packages resolver (note this must be run
//in a separate cell).
import ammonite._, Resolvers._
val newResolver = Resolver.Http("Spark Packages",
"http://dl.bintray.com/spark-packages/maven", MavenPattern, true)

interp.resolvers() = interp.resolvers() :+ newResolver
// Download scala dependencies.
import $exclude.org.slf4j:slf4j-log4j12,$ivy.org.slf4j:slf4j-nop:1.7.21
import $profile.hadoop-2.6 import$ivy.org.apache.spark::spark-sql:2.1.0
import $ivy.org.apache.spark::spark-graphx:2.1.0 import$ivy.org.apache.hadoop:hadoop-aws:2.6.4
import $ivy.org.jupyter-scala::spark:0.4.2 import$ivy.graphframes:graphframes:0.5.0-spark2.1-s_2.11


## Creating a Graph

Spark has two main graph libraries:

• GraphX, which is older and uses the more primitive RDDs.
• GraphFrames, which uses the DataFrame/DataSet API.

I recommend using GraphFrames as it is more aligned with the current state of Spark. In this tutorial, we will use GraphFrames as well as demonstrate our own custom implementations.In the following examples, we will work with a 14 vertex graph having 3 distinct connected components:

### Create a SparkSession

import jupyter.spark.session.JupyterSparkSession

val sparkSession = JupyterSparkSession.builder()
.jupyter()
.appName("notebook")
.master("local")
.getOrCreate()


### Create a Small Mock Graph With DataFrames

GraphFrames will require us to have 3 specific column names:

• src, dst on edges
• id on vertices
// Implicits do not work correctly in the notebook so we will
//build a DataFrame the long way.
import org.apache.spark.sql.Row
import org.apache.spark.sql.types.{IntegerType, StringType, StructType,
StructField}

val edgeSchema = StructType(List(
StructField("src", StringType, false),
StructField("dst", StringType, false)
))

val vertexSchema = StructType(List(
StructField("id", StringType, false)
))

val edgeRDD = sparkSession.sparkContext.parallelize(
Seq(Row("A", "B"),
Row("B", "C"),
Row("B", "D"),
Row("B", "E"),
Row("E", "F"),
Row("E", "G"),
Row("F", "G"),
Row("H", "I"),
Row("J", "I"),
Row("K", "L"),
Row("L", "M"),
Row("M", "N"),
Row("K", "N")
))

val vertexRDD = sparkSession.sparkContext.parallelize(
Seq(Row("A"),
Row("B"),
Row("C"),
Row("D"),
Row("E"),
Row("F"),
Row("G"),
Row("H"),
Row("I"),
Row("J"),
Row("K"),
Row("L"),
Row("M"),
Row("N")
)
)

val edgeDF = sparkSession.createDataFrame(edgeRDD, edgeSchema)
val vertexDF = sparkSession.createDataFrame(vertexRDD, vertexSchema)
// Show that our DataFrame creation worked.
println("Edges:")
edgeDF.show(20, false)

println("Vertices:")
vertexDF.show(20, false)
Edges:
+---+---+
|src|dst|
+---+---+
|A  |B  |
|B  |C  |
|B  |D  |
|B  |E  |
|E  |F  |
|E  |G  |
|F  |G  |
|H  |I  |
|J  |I  |
|K  |L  |
|L  |M  |
|M  |N  |
|K  |N  |
+---+---+

Vertices:
+---+
|id |
+---+
|A  |
|B  |
|C  |
|D  |
|E  |
|F  |
|G  |
|H  |
|I  |
|J  |
|K  |
|L  |
|M  |
|N  |
+---+


### Create a GraphFrames Graph Object

import org.graphframes._
import org.graphframes.GraphFrame

val g = GraphFrame(vertexDF, edgeDF)


### Simple Graph Computations

We will manually implement a few graph computations using DataFrames. In contrast, we will show the equivalent GraphFrame command to "check our answer."

### Compute Degrees With DataFrame Operations

Degree measurements provide insight into the connectivity of the vertices. If the vertices represent web pages and the edges represent the presence of hyperlinks between them, a high degree might signify that the web page is popular and relevant. In other applications, a high degree might help identify fake or erroneous data by revealing unrealistic connectivity.

import org.apache.spark.sql.functions.{count, sum}

// Since a vertex can be on either the src or dst node of the edge,
// we compute the counts for both and then sum the result.
val srcCount = edgeDF.distinct.groupBy("src")
.agg(count("*").alias("connecting_count"))
.withColumnRenamed("src", "id")

val dstCount = edgeDF.distinct.groupBy("dst")
.agg(count("*").alias("connecting_count"))
.withColumnRenamed("dst", "id")

// Union them together and sum the connecting count from both src
and dst.
val degrees = srcCount.union(dstCount)
.groupBy("id")
.agg(sum("connecting_count").alias("degree"))
degrees.sort("id").show(20, false)
+---+------+
|id |degree|
+---+------+
|A  |1     |
|B  |4     |
|C  |1     |
|D  |1     |
|E  |3     |
|F  |2     |
|G  |2     |
|H  |1     |
|I  |2     |
|J  |1     |
|K  |2     |
|L  |2     |
|M  |2     |
|N  |2     |
+---+------+


### Compute Degrees With GraphFrames

Degrees is a built-in function on the GraphFrame graph object.

g.degrees.sort("id").show(20, false)
+---+------+
|id |degree|
+---+------+
|A  |1     |
|B  |4     |
|C  |1     |
|D  |1     |
|E  |3     |
|F  |2     |
|G  |2     |
|H  |1     |
|I  |2     |
|J  |1     |
|K  |2     |
|L  |2     |
|M  |2     |
|N  |2     |
+---+------+


Our custom implementation matches the GraphFrames result.

### Compute Single Source Shortest Path Using Simple Joins

The shortest path algorithm is useful in many applications. For example, map software can use shortest path algorithms to compute the optimal route to a destination. It is often useful for the edges to be weighted (e.g., how much traffic does a road have)—in this example, we will use unweighted edges and compute the shortest path for a single starting and destination vertex.

import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.functions.{udf, array, col, size}

def mirrorEdges(edges: DataFrame): DataFrame = {
val swapped = edges.selectExpr("dst as src", "src as dst")
edges.union(swapped)
}

val appendToSeq = udf((x: Seq[String], y: String) => x ++ Seq(y))

def shortestPath(edges: DataFrame, start: String, end: String):
DataFrame = {
// Mirror edges on the first iteration.
val mirrored = mirrorEdges(edges)
mirrored.cache()

// Filter the edges to our starting vertex and init the path
// sequence.
val paths = mirrored.where(s"src = '$start'") .select( mirrored("src"), mirrored("dst"), array(mirrored("src"), mirrored("dst")).alias("path") ) // Recursively call until convergence val sp = shortestPathRecurse(paths, mirrored, end) sp.withColumn("path_length", size(sp("path")) - 1) } def shortestPathRecurse(paths: DataFrame, mirrored: DataFrame, end: String, iteration: Int = 0): DataFrame = { // Must alias to avoid common lineage cartesian join confusion. // Join the current terminal point for the path to the set of // mirrored edges. val sp = paths.alias("paths") .join(mirrored.alias("mirrored"), col("paths.dst") === col("mirrored.src")) .select( col("paths.src"), col("mirrored.dst"), appendToSeq(col("paths.path"), col("mirrored.dst")).alias("path") ) sp.cache() // Termination condition is the first traversal that arrives // at our destination vertex. val filtered = sp.where(s"dst = '$end'")
if (filtered.count() > 0){
filtered
} else {
shortestPathRecurse(sp, mirrored, end, iteration + 1)
}
}
// Compute for A to G.
shortestPath(edgeDF, "A", "G").show(20>, false)
+---+---+------------+-----------+
|src|dst|path        |path_length|
+---+---+------------+-----------+
|A  |G  |[A, B, E, G]|3          |
+---+---+------------+-----------+

// There are 2 equal length paths between M and K.
shortestPath(edgeDF, "M", "K").show(20, false)
+---+---+---------+-----------+
|src|dst|path     |path_length|
+---+---+---------+-----------+
|M  |K  |[M, N, K]|2          |
|M  |K  |[M, L, K]|2          |
+---+---+---------+-----------+


### Compute Shortest Path Using GraphFrames

GraphFrames also has a built-in function on the graph object for computing shortest path.
val sp = g.shortestPaths.landmarks(Seq("A", "G")).run()
sp.orderBy("id").show(20, false)
+---+-------------------+
|id |distances          |
+---+-------------------+
|A  |Map(A -> 0, G -> 3)|
|B  |Map(G -> 2)        |
|C  |Map()              |
|D  |Map()              |
|E  |Map(G -> 1)        |
|F  |Map(G -> 1)        |
|G  |Map(G -> 0)        |
|H  |Map()              |
|I  |Map()              |
|J  |Map()              |
|K  |Map()              |
|L  |Map()              |
|M  |Map()              |
|N  |Map()              |
+---+-------------------+


Both implementations show a path of length 3 between vertices A and G.

### Connected Components Using DataFrames

The connected component algorithm will segment a graph into fully connected bipartite subgraphs. While interesting by itself, connected components also form a starting point for other interesting algorithms (e.g. spectral clustering). The GraphX implementation is built upon the Pregel message paradigm. I have experienced trouble getting the GraphX implementation to scale on a large, highly connected graph. In this example, we will implement the alternating connected components algorithm described in the excellent paper Connected Components in MapReduce and Beyond. The alternating method (developed by Google researchers) is purported to scale to graphs with hundreds of billions of edges. The building blocks of this method are the small star and large star operations described in the paper. These two operations are alternately called in sequence until convergence.

import org.apache.spark.sql.DataFrame

// First, assign each vertex a integer value.
// NOTE: This would require efficiency improvements when used on
// large graphs.
// Using SparkSQL for demonstration and convenience.
vertexDF.createOrReplaceTempView("vertexDF")
edgeDF.createOrReplaceTempView("edgeDF")

val vertexMapper = sparkSession.sql(
"""select id, row_number() over(order by id) as int_id
from vertexDF
""")

// Map edges to the new integer id space.
vertexMapper.createOrReplaceTempView("vertexMapper")

val edgeIntDF = sparkSession.sql(
"""select v1.int_id as src, v2.int_id as dst
from edgeDF e
join vertexMapper v1 on v1.id = e.src
join vertexMapper v2 on v2.id = e.dst
"""
)

val vertexIntDF =
vertexMapper.select(vertexMapper("int_id").alias("id"))

edgeIntDF.cache()
vertexIntDF.cache()

def neighborhoodMinimum(edge: DataFrame): DataFrame = {
// For each src, connect to the minimum dst in the set of
// neighboring vertices.
// If src is the minimum, this overrides the neighborhood minimum.
// Retain the dst value for the final select step.
edge.selectExpr(
"src",
"dst",
"least(src, min(dst) over(partition by src)) as arg_min"
)
}

def smallStar(edge: DataFrame): DataFrame = {
val sorted = edge.selectExpr(
"case when dst <= src then src else dst end as src",
"case when dst <= src then dst else src end as dst"
)

val withArgMin = neighborhoodMinimum(sorted)
withArgMin.cache()

"arg_min as dst").distinct()
val neighborLinks = withArgMin.selectExpr("dst as src",
"arg_min as dst")

}

def largeStar(edge: DataFrame): DataFrame = {
val swapped = edge.selectExpr("dst as src", "src as dst")
val bothDirections = edge.union(swapped)
val withArgMin = neighborhoodMinimum(bothDirections)

withArgMin.where("dst > src").selectExpr("src as o", "dst as src",
"arg_min as dst")
}

def alternatingCC(edge: DataFrame, iteration: Int = 0,
maxIterations: Int = Int.MaxValue): DataFrame = {
// Compute large star and then compute small star on the output
// of large star.
val ls = largeStar(edge).distinct()
val ss = smallStar(ls).distinct()

// Finds links that changed between the current and previous
// recursion.
// Used for checking convergence.
val intersection = edge.join(ss, ss("src") === edge("src") &&
ss("dst") === edge("dst"),
joinType = "left_anti")

if(iteration == maxIterations) {
throw new Exception(s"Convergence was not achieved within
$maxIterations.") } else if (intersection.count() != 0){ // Iterate again ... links changed between the current // and previous recursion. alternatingCC(ss, iteration + 1, maxIterations) } else { // We have converged. println(s"Converged in$iteration iterations!")
ss.withColumnRenamed("src", "id")
.withColumnRenamed("dst", "connected_component")
}
}
alternatingCC(edgeIntDF).sort("id").show(20, false)
Converged in 2 iterations!
+---+-------------------+
|id |connected_component|
+---+-------------------+
|1  |1                  |
|2  |1                  |
|3  |1                  |
|4  |1                  |
|5  |1                  |
|6  |1                  |
|7  |1                  |
|8  |8                  |
|9  |8                  |
|10 |8                  |
|11 |11                 |
|12 |11                 |
|13 |11                 |
|14 |11                 |
+---+-------------------+


### Connected Components Using GraphFrames

Once again, GraphFrames provides a simple API for computing connected components.

sparkSession.sparkContext.setCheckpointDir("./spark-checkpoint")
val cc = g.connectedComponents.run()
cc.show(20, false)
+---+------------+
|id |component   |
+---+------------+
|A  |171798691840|
|B  |171798691840|
|C  |171798691840|
|D  |171798691840|
|E  |171798691840|
|F  |171798691840|
|G  |171798691840|
|H  |807453851648|
|I  |807453851648|
|J  |807453851648|
|K  |0           |
|L  |0           |
|M  |0           |
|N  |0           |
+---+------------+


The output from GraphFrames reflects a different integer assignment algorithm for the connected component ID. However, it still assigns the same distinct 3 connected components that our custom implementation discovered.

## Conclusion

In this article, we reviewed some basics of graph theory and implemented native DataFrame solutions to computing graph degrees, shortest path, and connected components. We then demonstrated the same calculations using the GraphFrames Spark package and compared the results. Graph problems have caught the attention of mathematicians and computer scientists for centuries. Modern graph theory began when Leonhard Euler negatively resolved the Seven Bridges of Königsberg problem in 1736. In the present, we have access to large-scale computational tools that historical practitioners could never have dreamed of. Apache Spark is one such tool for performing graph computations at scale.

### References

Bondy, J. A., and U. S. R. Murty. Graph Theory with Applications. Wiley, 2002.

“GraphFrames Overview.” Overview - GraphFrames 0.5.0 Documentation, graphframes.github.io/.

“Graph Properties & Measurements-Wolfram Language Documentation.” Experimental Errors and Error Analysis, reference.wolfram.com/language/guide/GraphPropertiesAndMeasurements.html.

“GraphX | Apache Spark.” Apache Spark™ - Unified Analytics Engine for Big Data, spark.apache.org/graphx/.

Hastie, Trevor, et al. The Elements of Statistical Learning: Data Mining, Inference, and Prediction. Springer, 2017.

Jupyter Scala. “Jupyter-Scala/Jupyter-Scala.” GitHub, 5 July 2017, github.com/jupyter-scala/jupyter-scala.

Kiveris, Raimondas, et al. “Connected Components in MapReduce and Beyond.” Proceedings of the ACM Symposium on Cloud Computing - SOCC '14, 2014, doi:10.1145/2670979.2670997.

Kumar, Shirish. “Connected Component Using Map-Reduce on Apache Spark.” LinkedIn, 12 Apr. 2017, www.linkedin.com/pulse/connected-component-using-map-reduce-apache-spark-shirish-kumar/

“Mac Spark-Shell Error Initializing SparkContext.” Stack Overflow, stackoverflow.com/questions/34601554/mac-spark-shell-error-initializing-sparkcontext.

“PageRank.” Wikipedia, Wikimedia Foundation, 31 July 2018, en.wikipedia.org/wiki/PageRank.

“Seven Bridges of Königsberg.” Wikipedia, Wikimedia Foundation, 3 July 2018, en.wikipedia.org/wiki/Seven_Bridges_of_K%C3%B6nigsberg.

“Six Degrees of Kevin Bacon.” Wikipedia, Wikimedia Foundation, 3 July 2018, en.wikipedia.org/wiki/Six_Degrees_of_Kevin_Bacon.

Author
Bryan Johnson

Bryan Johnson is a Principal Data Scientist on the Identity Graph team at Oracle Data Cloud. Previously, he worked on software development and actuarial predictive modeling in the commercial auto insurance industry. Bryan earned a bachelor's and master's degree in Mathematics from the University of Nebraska - Omaha.