This is the second tutorial on the Spark RDDs Vs DataFrames vs SparkSQL blog post series. The first one is available at DataScience+. In the first part, I showed how to retrieve, sort and filter data using Spark RDDs, DataFrames, and SparkSQL. In this tutorial, we will see how to work with multiple tables in Spark the RDD way, the DataFrame way and with SparkSQL.
If you like this tutorial series, check also my other recent blog posts on Spark on Analyzing the Bible and the Quran using Spark and Spark DataFrames: Exploring Chicago Crimes. The data and the notebooks can be downloaded from my GitHub repository. The size of the data is not large, however, the same code works for large volume as well. Therefore, we can practice with this dataset to master the functionalities of Spark.
For this tutorial, we will work with the SalesLTProduct.txt, SalesLTSalesOrderHeader.txt, SalesLTCustomer.txt, SalesLTAddress.txt and SalesLTCustomerAddress.txt datasets. Let’s answer a couple of questions using Spark Resilient Distiributed (RDD) way, DataFrame way and SparkSQL.
from pyspark import SparkContext, SparkConf from pyspark.sql import SQLContext conf = SparkConf().setAppName("miniProject").setMaster("local[*]") sc = SparkContext.getOrCreate(conf) sqlcontext = SQLContext(sc)
Retrieve customer orders
As an initial step towards generating invoice report, write a query that returns the company name from the SalesLTCustomer.txt, and the sales order ID and total due from the SalesLTSalesOrderHeader.txt.
RDD way
orderHeader = sc.textFile("SalesLTSalesOrderHeader.txt") customer = sc.textFile("SalesLTCustomer.txt")
Now, let’s have the column names and the contents separated.
customer_header = customer.first() customer_rdd = customer.filter(lambda line: line != customer_header) orderHeader_header = orderHeader.first() orderHeader_rdd = orderHeader.filter(lambda line: line != orderHeader_header)
We need only CustomerID and ComapanyName from the customers RDD. From the orderHeader RDD we need CustomerID,SalesOrderID and TotalDue then we are joining the two RDD using inner join.Finally, we are displaying 10 companies with the highest amout due.
customer_rdd1 = customer_rdd.map(lambda line: (line.split("\t")[0], #CustomerID line.split("\t")[7] #CompanyName )) orderHeader_rdd1 = orderHeader_rdd.map(lambda line: (line.split("\t")[10], #CustomerID (line.split("\t")[0], #SalesOrderID float(line.split("\t")[-4]) # TotalDue ))) invoice1 = customer_rdd1.join(orderHeader_rdd1) invoice1.takeOrdered(10, lambda x: -x[1][1][1]) [('29736', ('Action Bicycle Specialists', ('71784', 119960.824))), ('30050', ('Metropolitan Bicycle Supply', ('71936', 108597.9536))), ('29546', ('Bulk Discount Store', ('71938', 98138.2131))), ('29957', ('Eastside Department Store', ('71783', 92663.5609))), ('29796', ('Riding Cycles', ('71797', 86222.8072))), ('29929', ('Many Bikes Store', ('71902', 81834.9826))), ('29932', ('Instruments and Parts Company', ('71898', 70698.9922))), ('29660', ('Extreme Riding Supplies', ('71796', 63686.2708))), ('29938', ('Trailblazing Sports', ('71845', 45992.3665))), ('29485', ('Professional Sales and Service', ('71782', 43962.7901)))]
If we want, once we collect the RDD resulting from our transformations and actions, we can use other Python packages to visualize our data.
import pandas as pd top10 = invoice1.takeOrdered(10, lambda x: -x[1][1][1]) companies = [x[1][0] for x in top10] total_due = [x[1][1][1] for x in top10] top10_dict = {"companies": companies, "total_due":total_due} top10_pd = pd.DataFrame(top10_dict)
import matplotlib.pyplot as plt %matplotlib inline top10_pd.plot(figsize = (20, 10),kind = "bar", legend = False, x = "companies", y = "total_due") plt.xlabel("") plt.ylabel("Total Due", fontsize = 18) plt.title("Total Due of the Top 10 Companies by Amount Due", fontsize = 24) plt.xticks(size = 20) plt.yticks(size = 20) plt.show()
DataFrame way
First, we create DataFrames from the RDDs by using the first row as schema.
customer_df = sqlcontext.createDataFrame(customer_rdd.map(lambda line: line.split("\t")), schema = customer.first().split("\t")) orderHeader_df = sqlcontext.createDataFrame(orderHeader_rdd.map(lambda line: line.split("\t")), schema = orderHeader.first().split("\t"))
Now, let’s join the two DataFrames using the CustomerID column. We need to use inner join here. We are ordering the rows by TotalDue column in descending order but our result does not look normal. As we can see from the schema of the joined DataFrame, the TotalDue column is string. Therefore, we have to change that column to numeric field.
joined = customer_df.join(orderHeader_df, 'CustomerID', how = "inner") joined.select(["CustomerID", 'CompanyName','SalesOrderID','TotalDue']).orderBy("TotalDue", ascending = False).show(10, truncate = False) +----------+-----------------------------+------------+----------+ |CustomerID|CompanyName |SalesOrderID|TotalDue | +----------+-----------------------------+------------+----------+ |29546 |Bulk Discount Store |71938 |98138.2131| |29847 |Good Toys |71774 |972.785 | |29957 |Eastside Department Store |71783 |92663.5609| |30072 |West Side Mart |71776 |87.0851 | |29796 |Riding Cycles |71797 |86222.8072| |29929 |Many Bikes Store |71902 |81834.9826| |29531 |Remarkable Bike Store |71935 |7330.8972 | |29932 |Instruments and Parts Company|71898 |70698.9922| |30033 |Transport Bikes |71856 |665.4251 | |29660 |Extreme Riding Supplies |71796 |63686.2708| +----------+-----------------------------+------------+----------+
joined.printSchema() root |-- CustomerID: string (nullable = true) |-- NameStyle: string (nullable = true) |-- Title: string (nullable = true) |-- FirstName: string (nullable = true) |-- MiddleName: string (nullable = true) |-- LastName: string (nullable = true) |-- Suffix: string (nullable = true) |-- CompanyName: string (nullable = true) |-- SalesPerson: string (nullable = true) |-- EmailAddress: string (nullable = true) |-- Phone: string (nullable = true) |-- PasswordHash: string (nullable = true) |-- PasswordSalt: string (nullable = true) |-- rowguid: string (nullable = true) |-- ModifiedDate: string (nullable = true) |-- SalesOrderID: string (nullable = true) |-- RevisionNumber: string (nullable = true) |-- OrderDate: string (nullable = true) |-- DueDate: string (nullable = true) |-- ShipDate: string (nullable = true) |-- Status: string (nullable = true) |-- OnlineOrderFlag: string (nullable = true) |-- SalesOrderNumber: string (nullable = true) |-- PurchaseOrderNumber: string (nullable = true) |-- AccountNumber: string (nullable = true) |-- ShipToAddressID: string (nullable = true) |-- BillToAddressID: string (nullable = true) |-- ShipMethod: string (nullable = true) |-- CreditCardApprovalCode: string (nullable = true) |-- SubTotal: string (nullable = true) |-- TaxAmt: string (nullable = true) |-- Freight: string (nullable = true) |-- TotalDue: string (nullable = true) |-- Comment: string (nullable = true) |-- rowguid: string (nullable = true) |-- ModifiedDate: string (nullable = true)
from pyspark.sql.functions import col, udf from pyspark.sql.types import DoubleType convert = udf(lambda x: float(x), DoubleType())
Now, let’s change the TotalDue column to numeric.
joined2 = joined.withColumn('Total_Due',convert(col("TotalDue"))).drop("TotalDue") joined2.dtypes[-1] # we have created a new column with double type ('Total_Due', 'double')
joined2.select(["CustomerID", 'CompanyName','SalesOrderID','Total_Due'])\ .orderBy("Total_Due", ascending = False).show(10, truncate = False) +----------+------------------------------+------------+-----------+ |CustomerID|CompanyName |SalesOrderID|Total_Due | +----------+------------------------------+------------+-----------+ |29736 |Action Bicycle Specialists |71784 |119960.824 | |30050 |Metropolitan Bicycle Supply |71936 |108597.9536| |29546 |Bulk Discount Store |71938 |98138.2131 | |29957 |Eastside Department Store |71783 |92663.5609 | |29796 |Riding Cycles |71797 |86222.8072 | |29929 |Many Bikes Store |71902 |81834.9826 | |29932 |Instruments and Parts Company |71898 |70698.9922 | |29660 |Extreme Riding Supplies |71796 |63686.2708 | |29938 |Trailblazing Sports |71845 |45992.3665 | |29485 |Professional Sales and Service|71782 |43962.7901 | +----------+------------------------------+------------+-----------+ only showing top 10 rows
The result above is the same as the result we got using the RDD way above.
Running SQL Queries Programmatically
First, let’s create a local temporary view of the DataFrames and we can use normal SQL commands to get the 10 companies with the highest amount due.
orderHeader_df.createOrReplaceTempView("orderHeader_table") customer_df.createOrReplaceTempView("customer_table") sqlcontext.sql("SELECT c.CustomerID, c.CompanyName,oh.SalesOrderID,cast(oh.TotalDue AS DECIMAL(10,4)) \ FROM customer_table AS c INNER JOIN orderHeader_table AS OH ON c.CustomerID=oh.CustomerID \ ORDER BY TotalDue DESC LIMIT 10").show(10, truncate = False)
We see that the results we got using the above three methods, RDD way, DataFrame and with SparkSQL, are the same.
Retrieve customer orders with addresses
Extend your customer orders query to include the Main Office address for each customer, including the full street address, city, state or province, and country or region. Note that each customer can have multiple addresses in the SalesLTAddress.txt, so the SalesLTCustomerAddress.txt dataset enables a many-to-many relationship between customers and addresses. Your query will need to include both of these datasets and should filter the join to SalesLTCustomerAddress.txt so that only Main Office addresses are included.
RDD way
I am not repeating some of the steps, I did in question 1 above.
As we can see below, the datasets for this question are also tab delimited.
address = sc.textFile("SalesLTAddress.txt") customer_address = sc.textFile("SalesLTCustomerAddress.txt") customer_address.first() 'CustomerID\tAddressID\tAddressType\trowguid\tModifiedDate'
address.first() 'AddressID\tAddressLine1\tAddressLine2\tCity\tStateProvince\tCountryRegion\tPostalCode\trowguid\tModifiedDate'
Removing the column names from the RDDs.
address_header = address.first() address_rdd = address.filter(lambda line: line != address_header ) customer_address_header = customer_address.first() customer_address_rdd = customer_address.filter(lambda line: line != customer_address_header)
Include only those with AddressType of Main Office.
Split the lines and retain only fields of interest.
customer_address_rdd1 = customer_address_rdd.filter(lambda line: line.split("\t")[2] =="Main Office").map(lambda line: (line.split("\t")[0], #CustomerID line.split("\t")[1], #AddressID )) address_rdd1 = address_rdd.map(lambda line: (line.split("\t")[0], #AddressID (line.split("\t")[1], #AddressLine1 line.split("\t")[3], #City line.split("\t")[4], #StateProvince line.split("\t")[5] #CountryRegion )))
We can now join them.
rdd = customer_rdd1.join(orderHeader_rdd1).join(customer_address_rdd1).map(lambda line: (line[1][1], # AddressID (line[1][0][0], # Company line[1][0][1][0],# SalesOrderID line[1][0][1][1]# TotalDue ))) final_rdd = rdd.join(address_rdd1)
Let’s rearrange the columns.
final_rdd2 = final_rdd.map(lambda line: (line[1][0][0], # company float(line[1][0][2]), # TotalDue line[1][1][0], # Address 1 line[1][1][1], # City line[1][1][2], # StateProvince line[1][1][3] # CountryRegion ))
Let’s see 10 companies with the highest amount due.
final_rdd2.takeOrdered(10, lambda x: -x[1]) [('Action Bicycle Specialists', 119960.824, 'Warrington Ldc Unit 25/2', 'Woolston', 'England', 'United Kingdom'), ('Metropolitan Bicycle Supply', 108597.9536, 'Paramount House', 'London', 'England', 'United Kingdom'), ('Bulk Discount Store', 98138.2131, '93-2501, Blackfriars Road,', 'London', 'England', 'United Kingdom'), ('Eastside Department Store', 92663.5609, '9992 Whipple Rd', 'Union City', 'California', 'United States'), ('Riding Cycles', 86222.8072, 'Galashiels', 'Liverpool', 'England', 'United Kingdom'), ('Many Bikes Store', 81834.9826, 'Receiving', 'Fullerton', 'California', 'United States'), ('Instruments and Parts Company', 70698.9922, 'Phoenix Way, Cirencester', 'Gloucestershire', 'England', 'United Kingdom'), ('Extreme Riding Supplies', 63686.2708, 'Riverside', 'Sherman Oaks', 'California', 'United States'), ('Trailblazing Sports', 45992.3665, '251340 E. South St.', 'Cerritos', 'California', 'United States'), ('Professional Sales and Service', 43962.7901, '57251 Serene Blvd', 'Van Nuys', 'California', 'United States')]
DataFrame Way
Now, can create DataFrames from the RDDs and perform the joins.
address_df = sqlcontext.createDataFrame(address_rdd.map(lambda line: line.split("\t")), schema = address_header.split("\t") ) customer_address_df = sqlcontext.createDataFrame(customer_address_rdd .map(lambda line: line.split("\t")), schema = customer_address_header.split("\t") )
We can see the schema of each DataFrame.
address_df.printSchema() root |-- AddressID: string (nullable = true) |-- AddressLine1: string (nullable = true) |-- AddressLine2: string (nullable = true) |-- City: string (nullable = true) |-- StateProvince: string (nullable = true) |-- CountryRegion: string (nullable = true) |-- PostalCode: string (nullable = true) |-- rowguid: string (nullable = true) |-- ModifiedDate: string (nullable = true)
customer_address_df.printSchema() root |-- CustomerID: string (nullable = true) |-- AddressID: string (nullable = true) |-- AddressType: string (nullable = true) |-- rowguid: string (nullable = true) |-- ModifiedDate: string (nullable = true)
Now, we can finally join the DataFrames but to order the rows based on the total amount due, we have to first convert that column to numeric.
joined = (customer_df.join(orderHeader_df, 'CustomerID', how = "inner") .join(customer_address_df,'CustomerID', how = "inner" ) .join(address_df,'AddressID', how = "inner" )) joined2 = joined.withColumn('Total_Due',convert(col("TotalDue"))).drop("TotalDue").filter(joined['AddressType']=="Main Office") joined2.select(['CompanyName','Total_Due', 'AddressLine1','City', 'StateProvince','CountryRegion']).orderBy('Total_Due', ascending = False).show(10, truncate = False)
The answer using the RDD way is the same as the answer we got above using the RDD way.
Running SQL Queries Programmatically
As shown below, the answer using SQL, after creating a local temporary view, also gives the same answer as the RDD way and DataFrame way above.
address_df.createOrReplaceTempView("address_table") customer_address_df.createOrReplaceTempView("customer_address_table") sqlcontext.sql("SELECT c.CompanyName,cast(oh.TotalDue AS DECIMAL(10,4)), a.AddressLine1, \ a.City, a.StateProvince, a.CountryRegion FROM customer_table AS c \ INNER JOIN orderHeader_table AS oh ON c.CustomerID = oh.CustomerID \ INNER JOIN customer_address_table AS ca ON c.CustomerID = ca.CustomerID AND AddressType = 'Main Office' \ INNER JOIN address_table AS a ON ca.AddressID = a.AddressID \ ORDER BY TotalDue DESC LIMIT 10").show(truncate = False)
Retrieve a list of all customers and their orders
The sales manager wants a list of all customer companies and their contacts (first name and last name), showing the sales order ID and total due for each order they have placed. Customers who have not placed any orders should be included at the bottom of the list with NULL values for the order ID and total due.
RDD way
Let’s create the RDDs, select the fields of interest and join them.
customer_header = customer.first() customer_rdd = customer.filter(lambda line: line != customer_header) orderHeader_header = orderHeader.first() orderHeader_rdd = orderHeader.filter(lambda line: line != orderHeader_header)
customer_rdd1 = customer_rdd.map(lambda line: (line.split("\t")[0], #CustomerID (line.split("\t")[3], #FirstName line.split("\t")[5] #LastName ))) orderHeader_rdd1 = orderHeader_rdd.map(lambda line: (line.split("\t")[10], # CustomerID (line.split("\t")[0], # SalesOrderID line.split("\t")[-4] # TotalDue )))
We have to re-arrange customers that have made orders and those that have not ordered separetly and then uinon them at the end.
joined = customer_rdd1.leftOuterJoin(orderHeader_rdd1) NonNulls = joined.filter(lambda line: line[1][1]!=None) Nulls = joined.filter(lambda line: line[1][1]==None)
Let’s see the data structures for both of them.
NonNulls.take(5) [('30113', (('Raja', 'Venugopal'), ('71780', '42452.6519'))), ('30089', (('Michael John', 'Troyer'), ('71815', '1261.444'))), ('29485', (('Catherine', 'Abel'), ('71782', '43962.7901'))), ('29638', (('Rosmarie', 'Carroll'), ('71915', '2361.6403'))), ('29938', (('Frank', 'Campbell'), ('71845', '45992.3665')))]
Let’s rearrage them.
NonNulls2 = NonNulls.map(lambda line: (line[0], line[1][0][0],line[1][0][1], line[1][1][0], float(line[1][1][1]))) NonNulls2.first() ('30113', 'Raja', 'Venugopal', '71780', 42452.6519)
Similarly, let’s rearrange the Nulls RDD.
Nulls.take(5) [('190', (('Mark', 'Lee'), None)), ('30039', (('Robert', 'Stotka'), None)), ('110', (('Kendra', 'Thompson'), None)), ('29832', (('Jésus', 'Hernandez'), None)), ('473', (('Kay', 'Krane'), None))]
Nulls2 = Nulls.map(lambda line: (line[0], line[1][0][0],line[1][0][1], "NULL", "NULL"))
Now, we can union them and see the top five and bottom five as below.
union_rdd = NonNulls2.union(Nulls2)
DataFrame
Now, we let’s answer it the question the DataFrame approach.
customer_df = sqlcontext.createDataFrame(customer_rdd.map(lambda line: line.split("\t")), schema = customer.first().split("\t")) orderHeader_df = sqlcontext.createDataFrame(orderHeader_rdd.map(lambda line: line.split("\t")), schema = orderHeader.first().split("\t"))
customer_df.printSchema() root |-- CustomerID: string (nullable = true) |-- NameStyle: string (nullable = true) |-- Title: string (nullable = true) |-- FirstName: string (nullable = true) |-- MiddleName: string (nullable = true) |-- LastName: string (nullable = true) |-- Suffix: string (nullable = true) |-- CompanyName: string (nullable = true) |-- SalesPerson: string (nullable = true) |-- EmailAddress: string (nullable = true) |-- Phone: string (nullable = true) |-- PasswordHash: string (nullable = true) |-- PasswordSalt: string (nullable = true) |-- rowguid: string (nullable = true) |-- ModifiedDate: string (nullable = true)
orderHeader_df.printSchema() root |-- SalesOrderID: string (nullable = true) |-- RevisionNumber: string (nullable = true) |-- OrderDate: string (nullable = true) |-- DueDate: string (nullable = true) |-- ShipDate: string (nullable = true) |-- Status: string (nullable = true) |-- OnlineOrderFlag: string (nullable = true) |-- SalesOrderNumber: string (nullable = true) |-- PurchaseOrderNumber: string (nullable = true) |-- AccountNumber: string (nullable = true) |-- CustomerID: string (nullable = true) |-- ShipToAddressID: string (nullable = true) |-- BillToAddressID: string (nullable = true) |-- ShipMethod: string (nullable = true) |-- CreditCardApprovalCode: string (nullable = true) |-- SubTotal: string (nullable = true) |-- TaxAmt: string (nullable = true) |-- Freight: string (nullable = true) |-- TotalDue: string (nullable = true) |-- Comment: string (nullable = true) |-- rowguid: string (nullable = true) |-- ModifiedDate: string (nullable = true)
We can see samples of those that have made orders and those that have not as below.
joined = customer_df.join(orderHeader_df, 'CustomerID', how = "left") joined.select(["CustomerID", 'FirstName','LastName','SalesOrderNumber','TotalDue'])\ .orderBy("TotalDue", ascending = False).show(10, truncate = False) +----------+-----------+--------+----------------+----------+ |CustomerID|FirstName |LastName|SalesOrderNumber|TotalDue | +----------+-----------+--------+----------------+----------+ |29546 |Christopher|Beck |SO71938 |98138.2131| |29847 |David |Hodgson |SO71774 |972.785 | |29957 |Kevin |Liu |SO71783 |92663.5609| |30072 |Andrea |Thomsen |SO71776 |87.0851 | |29796 |Jon |Grande |SO71797 |86222.8072| |29929 |Jeffrey |Kurtz |SO71902 |81834.9826| |29531 |Cory |Booth |SO71935 |7330.8972 | |29932 |Rebecca |Laszlo |SO71898 |70698.9922| |30033 |Vassar |Stern |SO71856 |665.4251 | |29660 |Anthony |Chor |SO71796 |63686.2708| +----------+-----------+--------+----------------+----------+ only showing top 10 rows
joined.select(["CustomerID", 'FirstName','LastName','SalesOrderNumber','TotalDue'])\ .orderBy("TotalDue", ascending = True).show(10, truncate = False) +----------+---------+-----------+----------------+--------+ |CustomerID|FirstName|LastName |SalesOrderNumber|TotalDue| +----------+---------+-----------+----------------+--------+ |29539 |Josh |Barnhill |null |null | |29573 |Luis |Bonifaz |null |null | |29865 |Lucio |Iallo |null |null | |29978 |Ajay |Manchepalli|null |null | |451 |John |Emory |null |null | |124 |Yuhong |Li |null |null | |29580 |Richard |Bready |null |null | |7 |Dominic |Gash |null |null | |29525 |Teresa |Atkinson |null |null | |29733 |Shannon |Elliott |null |null | +----------+---------+-----------+----------------+--------+ only showing top 10 rows
Running SQL Queries Programmatically
Below, I have shown samples of those that have made orders and those that have not using normal SQL commands.
orderHeader_df.createOrReplaceTempView("orderHeader_table") customer_df.createOrReplaceTempView("customer_table") sqlcontext.sql("SELECT c.CustomerID, c.FirstName,c.LastName, oh.SalesOrderID,cast(oh.TotalDue AS DECIMAL(10,4)) \ FROM customer_table AS c LEFT JOIN orderHeader_table AS oh ON c.CustomerID = oh.CustomerID \ ORDER BY TotalDue DESC LIMIT 10").show(truncate = False)
sqlcontext.sql("SELECT c.CustomerID, c.FirstName,c.LastName, oh.SalesOrderID,cast(oh.TotalDue AS DECIMAL(10,4)) \ FROM customer_table AS c LEFT JOIN orderHeader_table AS oh ON c.CustomerID = oh.CustomerID \ ORDER BY TotalDue ASC LIMIT 10").show(truncate = False)
Retrieve a list of customers with no address
A sales employee has noticed that Adventure Works does not have address information for all customers. You must write a query that returns a list of customer IDs, company names, contact names (first name and last name), and phone numbers for customers with no address stored in the database.
RDD way
customer_header = customer.first() customer_rdd = customer.filter(lambda line: line != customer_header) customer_address_header = customer_address.first() customer_address_rdd = customer_address.filter(lambda line: line != customer_address_header) customer_rdd1 = customer_rdd.map(lambda line: (line.split("\t")[0], #CustomerID (line.split("\t")[3], #FirstName line.split("\t")[5], #LastName line.split("\t")[7], #CompanyName line.split("\t")[9], #EmailAddress line.split("\t")[10] #Phone ))) customer_address_rdd1 = customer_address_rdd.map(lambda line: (line.split("\t")[0],line.split("\t")[1]))
First, let’s join the customer data to the customer address dataset.Then, we will filter the RDD to include those that do not have address information.
joined = customer_rdd1.leftOuterJoin(customer_address_rdd1)
DataFrame way
After getting those who don’t have address information, below I am displaying 10 rows.
customer_df = sqlcontext.createDataFrame(customer_rdd.map(lambda line: line.split("\t")), schema = customer.first().split("\t")) customer_address_df = sqlcontext.createDataFrame(customer_address_rdd.map(lambda line: line.split("\t")), schema = customer_address_header.split("\t")) joined = customer_df.join(customer_address_df, 'CustomerID','left') joined.filter(col("AddressID").isNull()).\ select(['FirstName','LastName','CompanyName','EmailAddress','Phone'])\ .show(10, truncate = False)
Running SQL Queries Programmatically
Using SQL also gives the same answers as the DataFrame approach shown above.
customer_address_df.createOrReplaceTempView("customer_address_table") customer_df.createOrReplaceTempView("customer_table") sqlcontext.sql("SELECT c.FirstName,c.LastName, c.CompanyName,c.EmailAddress,c.Phone \ FROM customer_table AS c LEFT JOIN customer_address_table AS ca ON c.CustomerID = ca.CustomerID \ WHERE ca.AddressID IS NULL").show(10, truncate = False)
This is enough for today. In the next part of the Spark RDDs Vs DataFrames vs SparkSQL tutorial series, I will come with a different topic. If you have any questions or suggestions, feel free to drop them below.