Imagine a huge wifi network with hundred of access points (APs) and thousands of users. The network is managed by a controller, which has full visibility of all events like a new user connection or disconnection. In my case, the controller reports all the events to a Hadoop Distributed File System (HDFS) in raw format.

The problem

Important points to keep in mind:

  • How many active users do I have? How often are they using the network? Beyond that, could I know if there are different groups of users in terms of the network utilization?
  • What is the overall performance of my network? Do I have any non-operational Access Points? On the oposite, which are the most popular APs?
  • Are there anomalies on the network? Any kind of fraud?

I know it may sound very ambitious so, for now, I will focus on how to increase the data quality to make possible to achieve our goals.

The goal

My goal is to transform the source raw data into a more valuable dataset.

For this kind of task I feel very comfortable with pandas, but given the amount of data to process, a non-distributed approach is imposible. Thats why I’m going to use Spark DataFrames to take advantage of the parallelization.

Exploring the dataset

In this case, i’m going to deal with dayly files with the following structure:

$ head -n 1 20160401_events.txt
ts;mac;ap_name;event_type;username;ipaddress;optional_field

I always feel more convinient to see a single record in a column style with field indexes:

$ tail -n 1 20160401_events.txt | sed "s/;/\n/g" | cat -n
 1  2016-05-06 14:43:48.766287  # timestamp 
 2  112233445566                # mac 
 3  AP1031                      # ap_name 
 4  authentication              # event_type 
 5  foo@bar.com                 # username 
 6  192.168.1.25                # ipaddress 
 7  15                          # optional_field (non relevant)

Importing the data into a spark-csv dataframe

spark-csv module makes pretty straighforward to import all the dayly files into a single dataframe:

from pyspark.sql import SQLContext
from pyspark.sql.types import *
from pyspark.sql.functions import *

sqlContext = SQLContext(sc)

# The custom schema needed to parse the input files
# The timestamp will be automatically parsed to a
# TimestampType
customSchema = StructType([ \
        StructField('ts', TimestampType(), True), \
        StructField('mac', StringType(), True), \
        StructField('ap', StringType(), True), \
        StructField('type', StringType(), True), \
        StructField('username', StringType(), True), \
        StructField('ip', StringType(), True), \
        StructField('optional', StringType(), True) \
        ])

# Read the data into a spark dataframe. With the wilcard '*'
# we'll import all the dayly files from a monthly folder
df = sqlContext.read \
        .format('com.databricks.spark.csv') \
        .options(delimiter=';') \
        .load('/wiminer/201602/20160201_events.txt',\
              schema=customSchema)

# Make sure everything went well
df.show(2)
+-------------+------------+-----+-------+------------+------------+
|           ts|         mac|   ap|   type|    username|          ip|
+-------------+------------+-----+-------+------------+------------+
|2016-02-01...|112233445566|AP976|auth...|user5566@...|192.168.1.99|
|2016-02-01...|998877665544|AP976|asso...|user5544@...|192.168.1.98|
+-------------+------------+-----+-------+------------+------------+

Generating more features

By using the select feature in combination with the built-in functions of the packagepyspark.sql.function, it’s easy to create a new dataframe with more features:

rich_df = df.select(\
  df.ap,\
  df.mac,\
  dayofmonth(df.ts).alias('day'),\
  hour(df.ts).alias('hour'),\
  floor(hour(df['ts']) / 4).alias('hour_range'))\
  .cache()

rich_df.show(2)
+------------+------------+---+----+----------+
|          ap|         mac|day|hour|hour_range|
+------------+------------+---+----+----------+
|       AP976|112233445566|  1|   0|         0|
|       AP976|998877665544|  1|   0|         0|
+------------+------------+---+----+----------+

Global stats by Access Point

ap_df = rich_df.groupby('ap').agg(\
  countDistinct('day').alias('total_days'),\
  count('mac').alias('total_events'),\
  countDistinct('mac').alias('total_users'))

ap_df.show(2)
+-------------+----------+------------+-----------+
|           ap|total_days|total_events|total_users|
+-------------+----------+------------+-----------+
|       AP1031|        25|         555|        111|
|       AP1353|        30|         399|         65|
+-------------+----------+------------+-----------+

More features: hour ranges counts

The ap_df has lost all the hourly information. My goal now is know at which hours do the users use each AP.

# Filter for each hour range, then group by ap and mac, and
# finally count how many distinct days as this user
# connected to this ap during the specific hour range.
hr0 = rich_df.filter( rich_df.hour_range == 0)
 .groupby('ap', 'mac')
 .agg( countDistinct('day').alias('hour_range0') )

hr1 = rich_df.filter( rich_df.hour_range == 1)
 .groupby('ap', 'mac')
 .agg( countDistinct('day').alias('hour_range1') )

hr2 = rich_df.filter( rich_df.hour_range == 2)
 .groupby('ap', 'mac')
 .agg( countDistinct('day').alias('hour_range2') )

hr3 = rich_df.filter( rich_df.hour_range == 3)
 .groupby('ap', 'mac')
 .agg( countDistinct('day').alias('hour_range3') )

hr4 = rich_df.filter( rich_df.hour_range == 4)
 .groupby('ap', 'mac')
 .agg( countDistinct('day').alias('hour_range4') )

hr5 = rich_df.filter( rich_df.hour_range == 5)
 .groupby('ap', 'mac')
 .agg( countDistinct('day').alias('hour_range5') )

# Combine in a single dataframe all the data
hour_ranges_df = hr0\
 .join(hr1, [ 'ap', 'mac'], how='outer')\
 .join(hr2, [ 'ap', 'mac'], how='outer')\
 .join(hr3, [ 'ap', 'mac'], how='outer')\
 .join(hr4, [ 'ap', 'mac'], how='outer')\
 .join(hr5, [ 'ap', 'mac'], how='outer')\
 .na.fill(0)

# Group by ap and sum each column. Each feature will have a
# value between 0 and 31 (if all the users connected all
# the days of a month in a given hour range)
hour_ranges_df = hour_ranges_df.groupby('ap').agg(\
 avg('hour_range0').alias('avg_days_hr0'),\
 avg('hour_range1').alias('avg_days_hr1'),\
 avg('hour_range2').alias('avg_days_hr2'),\
 avg('hour_range3').alias('avg_days_hr3'),\
 avg('hour_range4').alias('avg_days_hr4'),\
 avg('hour_range5').alias('avg_days_hr5')\
 )

# Add these new features to the ap_df
ap_df = ap_df.join(hour_ranges_df, 'ap', how='outer')

Final dataframe

ap_df.show(2)
+-----+----------+------------+-----------+------------+...+------------+
|   ap|total_days|total_events|total_users|avg_days_hr0|...|avg_days_hr5|
+-----+----------+------------+-----------+------------+...+------------+
|AP101|        30|       16463|       2539|       0.236|...|       0.458|
|AP102|        14|       18576|       3052|       0.085|...|       0.051|
+-----+----------+------------+-----------+------------+...+------------+

As in pandas, the describe() method results handy for basic exploratory data analysis:

ap_df.describe().show()
+-------+-----------+-------------+------------+-------------+...+
|summary| total_days| total_events| total_users| avg_days_hr0|...|
+-------+-----------+-------------+------------+-------------+...+
|  count|       1567|         1567|        1567|         1567|...|
|   mean|      26.84|     18699.76|     2470.54|        0.098|...|
| stddev|       5.88|     19691.72|     2883.76|        0.101|...|
|    min|          1|            1|           1|        0.000|...|
|    max|         29|       147049|       19501|         1.51|...|
+-------+-----------+-------------+------------+-------------+...+

Conclusion

As shown, Apache Spark is very powerful for this kind of data munging at scale. The source raw data has been efficiently transformed into a high value data set ready to be analyzed in deep or to be used as source of a predictive / anomaly detection model. To summarize, the key features used are:

  • spark-csv to parse the inputs files and convert them to a dataframe.
  • select in combination of the built-in pyspark.sql.functions functions to create more features.
  • Aggregation and joints.
  • Basic exploratory data analysis with describe().
Advertisements

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Google+ photo

You are commenting using your Google+ account. Log Out / Change )

Connecting to %s