In [1]:
import findspark
findspark.init('/usr/local/Cellar/apache-spark/2.3.2/libexec/')

import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.window import Window
In [2]:
sc = SparkSession.builder.appName("TestGA").getOrCreate()

# vor ga_sessions_201611* weitgehend category "(not set)"
# pyspark wird manchmal falsch interpretiert mit Java > 8

ga_data = sc.read.json('/Users/eduardoschumann/Documents/Projekte/E-CommerceAnalytics/ga_data/ga_sessions_2016*.json.gz')

Filtern von Hits die Transaktionen darstellen.

In [3]:
transactionsInfo = ga_data.select('fullVisitorId','visitId', 'visitNumber', 
                                  expr("totals.transactions").cast('integer'), 
                                  expr("totals.transactionRevenue").cast('integer'), 
                                  explode('hits').alias('hit'), 
                                  'hit.isExit',
                                  'hit.isInteraction', 
                                  'hit.product'
                                  ).fillna(0)\
                            .filter("transactions > 0")\
                            .filter(col('hit.transaction.transactionId').isNotNull())\
                            .filter(col('hit.isExit').isNull())\
                            .drop('hit')
In [4]:
transactionsInfo.printSchema()
root
 |-- fullVisitorId: string (nullable = true)
 |-- visitId: string (nullable = true)
 |-- visitNumber: string (nullable = true)
 |-- transactions: integer (nullable = true)
 |-- transactionRevenue: integer (nullable = true)
 |-- isExit: boolean (nullable = true)
 |-- isInteraction: boolean (nullable = true)
 |-- product: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- customDimensions: array (nullable = true)
 |    |    |    |-- element: string (containsNull = true)
 |    |    |-- customMetrics: array (nullable = true)
 |    |    |    |-- element: string (containsNull = true)
 |    |    |-- isClick: boolean (nullable = true)
 |    |    |-- isImpression: boolean (nullable = true)
 |    |    |-- localProductPrice: string (nullable = true)
 |    |    |-- localProductRevenue: string (nullable = true)
 |    |    |-- productBrand: string (nullable = true)
 |    |    |-- productListName: string (nullable = true)
 |    |    |-- productListPosition: string (nullable = true)
 |    |    |-- productPrice: string (nullable = true)
 |    |    |-- productQuantity: string (nullable = true)
 |    |    |-- productRevenue: string (nullable = true)
 |    |    |-- productSKU: string (nullable = true)
 |    |    |-- productVariant: string (nullable = true)
 |    |    |-- v2ProductCategory: string (nullable = true)
 |    |    |-- v2ProductName: string (nullable = true)

In [5]:
productInfo = transactionsInfo.select('fullVisitorId',
                        explode('product').alias('sproduct'),
                        expr('sproduct.productQuantity').cast('integer'),
                        expr('sproduct.productRevenue').cast('integer'),
                        'sproduct.productSKU',
                        'sproduct.v2ProductCategory',
                        'sproduct.v2ProductName'
                        ).filter(~((col('v2ProductCategory') == '${productitem.product.origCatName}')|
                            (col('v2ProductCategory') == '(not set)'))).\
                        fillna({'productQuantity':0, 'productRevenue':0 }).\
                        filter('productQuantity > 1').\
                        drop('sproduct')
In [6]:
productInfo.show(3, truncate=False)
+-------------------+---------------+--------------+--------------+-----------------+---------------------------+
|fullVisitorId      |productQuantity|productRevenue|productSKU    |v2ProductCategory|v2ProductName              |
+-------------------+---------------+--------------+--------------+-----------------+---------------------------+
|8813436791449006259|2              |24980000      |GGOEGHPB071610|Apparel          |Google Twill Cap           |
|788289060362109435 |11             |532890000     |GGOEGBRB073899|Backpacks        |Google Laptop Tech Backpack|
|4294172539120874565|20             |203800000     |GGOEGOCT019199|Office           |Red Spiral Google Notebook |
+-------------------+---------------+--------------+--------------+-----------------+---------------------------+
only showing top 3 rows

In [7]:
productbyVisitor = productInfo.groupBy('fullVisitorId', 'productSKU').\
    agg(sum('productQuantity').alias('quantity'),
        first(col('v2ProductCategory')).alias('category'),
        first(col('productRevenue')).alias('revenue'),
        first(col('v2ProductName')).alias('product'),
       )
In [12]:
collected = productbyVisitor.groupBy('fullVisitorId').agg(collect_list("product").alias("products"))

make a struct out of product cols and collect by user

In [13]:
collected.write.json('productCollected.json')