Source code for markovclick.preprocessing

"""
Functions for preprocessing clickstream datasets
"""

from uuid import uuid4
from multiprocessing import Process, Queue
from datetime import timedelta
import numpy as np
import pandas as pd


[docs]class Sessionise: """ Class with functions to sessionise a pandas DataFrame containing clickstream data. """
[docs] def __init__(self, df, unique_id_col: str, datetime_col: str, session_timeout: int = 30) -> None: """ Instantiates object of ``Sessionise`` class. Args: df (pd.DataFrame): ``pandas`` DataFrame object containing clickstream data. Must contain atleast a timestamp column, unique identifier column such as cookie ID. unique_id_col (str): Column name of unique identifier, e.g. ``cookie_id`` datetime_col (str): Column name of timestamp column. session_timeout (int, optional): Defaults to 30. Maximum time in minutes after which a session is broken. """ self._df = df self.unique_id_col = unique_id_col self.datetime_col = datetime_col self._session_timeout = session_timeout self.curr_uniq_id = str(uuid4()) self.columns = [self.unique_id_col, 'prev_uniq_id', 'session_boundary']
@property def df(self): """ Provides access to ``df`` attribute """ return self._df @property def unique_id_col(self): """ Provides access to ``unique_id_col`` attribute """ return self.__unique_id_col @unique_id_col.setter def unique_id_col(self, name: str): """ Sets value for ``unique_id_col`` attribute """ if name not in self.df.columns: raise ValueError("Unique ID column name not in dataframe.") elif isinstance(name, str): self.__unique_id_col = name else: raise ValueError("Unique ID column name should be string.") @property def datetime_col(self): """ Provides access to ``datetime_col`` attribute """ return self.__datetime_col @datetime_col.setter def datetime_col(self, name: str): """ Sets value for ``datetime_col`` attribute """ if isinstance(name, str) and np.issubdtype( self.df[name], np.datetime64 ): self.__datetime_col = name else: raise TypeError("Datetime column name should be string referring\ to a datetime column.") @property def session_timeout(self): """ Provides access to ``session_timeout`` attribute """ return self._session_timeout def _add_session_boundaries(self): """ Adds a column to denote the session boundaries in clickstream data """ self._df.sort_values(by=[self.unique_id_col, self.datetime_col]) self._df['prev_uniq_id'] = self._df.shift(1)[self.unique_id_col] self._df['time_diff'] = self._df[self.datetime_col].diff(1) self._df['time_diff'] = self._df['time_diff'].fillna(timedelta(0)) self._df['session_boundary'] = self._df['time_diff'] - timedelta( minutes=self.session_timeout ) self._df.loc[:, 'session_boundary'] = self._df[ 'session_boundary' ].apply( lambda row: True if row > timedelta(0) else False ) def _get_or_create_uuid(self, row) -> str: """ Provides a new or returns previous session UUID depending on criteria for setting a new session boundary. Args: row: Row in DataFrame of clickstream dataset Returns: str: Unique session UUID """ curr_uniq_id = row[self.unique_id_col] prev_anon_id = row['prev_uniq_id'] boundary = row['session_boundary'] if boundary is True or curr_uniq_id != prev_anon_id: self.curr_uniq_id = str(uuid4()) return self.curr_uniq_id else: return self.curr_uniq_id def _create_partitions(self, partitions: int) -> list: """ Splits clickstream dataset into specified number of partitions, based on the unique IDs (e.g. cookie ID) in the DataFrame. Args: partitions (int): Number of partitions to split into Returns: list: List of DataFrames partitioned """ uniq_ids = self.df[self.unique_id_col].unique() partitions = list( filter(lambda x: x.size > 0, np.array_split(uniq_ids, partitions)) ) return partitions def _assign_sessions_parallel(self, df, partition: list, queue): """ Assigns sessions to partition of DataFrame, created using list of unique IDs provided in partition argument. Args: df (pd.DataFrame): DataFrame containing clickstream data partition (list): List of unique IDs to subset DataFrame queue: multiprocessing.Queue object to add results to """ _df = df.loc[df[self.unique_id_col].isin(partition), :] curr_session_uuid = str(uuid4()) def get_or_create_uuid(row) -> str: nonlocal curr_session_uuid curr_uniq_id = row[self.unique_id_col] prev_anon_id = row['prev_uniq_id'] boundary = row['session_boundary'] if boundary is True or curr_uniq_id != prev_anon_id: curr_session_uuid = str(uuid4()) return curr_session_uuid else: return curr_session_uuid _df.loc[:, 'session_uuid'] = _df.apply( lambda row: get_or_create_uuid(row), axis=1 ) queue.put(_df)
[docs] def assign_sessions(self, n_jobs: int = 1): """ Assigns unique session IDs to individual clicks that form the sessions. Supports parallel processing through setting ``n_jobs`` to higher than 1. Args: n_jobs (int, optional): Defaults to 1. If 2 or higher, enables parallel processing. Returns: pd.DataFrame: Returns sessionised DataFrame, with session IDs stored in ``session_UUID`` column. """ self._add_session_boundaries() if n_jobs == 1: self._df.loc[:, 'session_uuid'] = self.df[self.columns].apply( lambda row: self._get_or_create_uuid(row), axis=1 ) return self.df if n_jobs > 1: partitions = self._create_partitions(n_jobs) queue = Queue() processes = [] for partition in partitions: processes.append(Process( target=self._assign_sessions_parallel, args=(self.df, partition, queue) )) for process in processes: process.start() results = [queue.get() for process in processes] for process in processes: process.join() self._df = pd.concat(results, axis=0) self._df = self._df.sort_index() return self.df