mobts.preprocessing package

Submodules

mobts.preprocessing.cleaning module

Cleaning the dataset for further operations

This module contains: - transforming the hourly dataset to daily dataset - removing measurement errors for data with hourly frequency per counter - removing measurement errors for data with hourly frequency per counter - a wrapper for removing measurement errors per counter - function for removing measurement errors for the entire network

mobts.preprocessing.cleaning._aggregate_hourly_to_daily(df: DataFrame, cols: ColumnsConfig = ColumnsConfig(counter='counter', timestamp='timestamp', count='count', weekday='weekday', week_num='week_num', how='how', hour='hour', date='date')) DataFrame

Aggregate hourly frequency to daily frequency

  • df: hourly dataset

  • cols: Column config

  • DataFrame with daily frequency

mobts.preprocessing.cleaning._remove_measurement_errors_all_network(df_network: DataFrame, data_is_hourly: bool = True, change_to_daily: bool = False, cols: ColumnsConfig = ColumnsConfig(counter='counter', timestamp='timestamp', count='count', weekday='weekday', week_num='week_num', how='how', hour='hour', date='date'), cfg: PreprocessConfig = PreprocessConfig(low_rel_daily=0.01, low_abs_daily=5, low_run_min_daily=2, zero_rate_max=0.05, zero_run_min=6, island_max_len=6, surround_min_len=12)) DataFrame

Applying the cleaning function on the entire network

  • df_network: the DataFrame containing all counters

  • data_is_hourly: indicator of if data is hourly or not (daily otherwise)

  • change_to_daily: indicator of if we are aggregating hourly to daily data

  • cols: column config

  • cfg: config for preprocessing

  • Single counter DataFrame where counts suspcious of being made with measurement errors are set to NaN

  • number of observations that have been changed to NaN by the function

  • number of counters that have been affected by the cleaning function

mobts.preprocessing.cleaning._remove_measurement_errors_daily(df_counter: DataFrame, cols: ColumnsConfig = ColumnsConfig(counter='counter', timestamp='timestamp', count='count', weekday='weekday', week_num='week_num', how='how', hour='hour', date='date'), cfg: PreprocessConfig = PreprocessConfig(low_rel_daily=0.01, low_abs_daily=5, low_run_min_daily=2, zero_rate_max=0.05, zero_run_min=6, island_max_len=6, surround_min_len=12)) DataFrame

Remove measurement errors for data with daily frequency per counter

  • df_counter: daily dataset of a single counter

  • cols: column config

  • cfg: config for preprocessing

  • Single counter DataFrame where daily counts suspcious of being made with measurement errors are set to NaN

  • The process includes removing counts in following conditions:
    1. days recorded with 0 observations are set to NaN

    2. a threshold is calculated for low-observation noise. this threshold is the maximum of a pre-set ‘low_abs_daily’ counts, and ‘low_rel_daily’ (in %) of a baseline that is defined as the median of all counts. observations under this threshold are considered as “low counts”. if these low count observations persist for longer than ‘low_run_min_daily’, they will be set to NaN as well.

  • Considering the aggregate nature of daily data, compared to hourly data, the removal of 0s and low counts are more generously applied.

mobts.preprocessing.cleaning._remove_measurement_errors_hourly(df_counter: DataFrame, cols: ColumnsConfig = ColumnsConfig(counter='counter', timestamp='timestamp', count='count', weekday='weekday', week_num='week_num', how='how', hour='hour', date='date'), cfg: PreprocessConfig = PreprocessConfig(low_rel_daily=0.01, low_abs_daily=5, low_run_min_daily=2, zero_rate_max=0.05, zero_run_min=6, island_max_len=6, surround_min_len=12)) DataFrame

Remove measurement errors for data with hourly frequency per counter

  • df_counter: hourly dataset of a single counter

  • cols: Column config

  • cfg: config for preprocessing

  • Single counter DataFrame where hourly counts suspcious of being made with measurement errors are set to NaN

  • The process includes removing counts in following conditions:
    1. if a counter has recorded 0 counts for en entire day, the entire day will be set to NaN.

    2. for an hour’s count to be set to zero, it has to be outside night hours, and the rate of 0 observations for that hour needs to be lower than ‘zero_rate_max’. additionally, the median observation of the hour has to be at least one standard deviation higher than zero.

    3. after the conditions mentioned above (2), only 0 records that are consequent for ‘zero_run_min’ hours are set to NaN.

    4. observation islands with a duration lower than ‘island_max_len’ that are surrounded by zero or NaN observations lengthier than ‘surround_min_len’ are also set to NaN.

  • Considering noise and variations of hourly data, strict conditions are set as explained above to prevent unnecessary loss of data.

mobts.preprocessing.cleaning._remove_measurement_errors_wrapper_counter(df_counter: DataFrame, data_is_hourly: bool, change_to_daily: bool, cols: ColumnsConfig = ColumnsConfig(counter='counter', timestamp='timestamp', count='count', weekday='weekday', week_num='week_num', how='how', hour='hour', date='date'), cfg: PreprocessConfig = PreprocessConfig(low_rel_daily=0.01, low_abs_daily=5, low_run_min_daily=2, zero_rate_max=0.05, zero_run_min=6, island_max_len=6, surround_min_len=12)) DataFrame

Wrapper for choosing the measurement error remover by frequency for a single counter

  • df_counter: daily dataset of a single counter

  • data_is_hourly: indicator of if data is hourly or not (daily otherwise)

  • change_to_daily: indicator of if we are aggregating hourly to daily data

  • cols: column config

  • cfg: config for preprocessing

  • Single counter DataFrame where counts suspcious of being made with measurement errors are set to NaN

mobts.preprocessing.outliers module

Calculating and assigning an outlier score to each observation

This module contains: - calculating outlier score for daily data - calculating outlier score for hourly data - applying outlier score to all counters in the network

mobts.preprocessing.outliers._calculate_outlier_score(df: DataFrame, data_is_hourly: bool = True, change_to_daily: bool = False, cols: ColumnsConfig = ColumnsConfig(counter='counter', timestamp='timestamp', count='count', weekday='weekday', week_num='week_num', how='how', hour='hour', date='date'), stl_cfg: STLConfig = STLConfig(period=28, robust=False)) DataFrame

Calculating outlier score for each observation based on temporal frequency

  • df: complete dataset

  • data_is_hourly: indicator of if data is hourly or not (daily otherwise)

  • change_to_daily: indicator of if we are aggregating hourly to daily data

  • cols: Column config

  • stl_cfg: config for STL

  • DataFrame of all observations with their corresponding outlier score

the choice of outlier threshold downstream in the code

mobts.preprocessing.outliers._calculate_outlier_score_counter_daily(df_counter_daily: DataFrame, cols: ColumnsConfig = ColumnsConfig(counter='counter', timestamp='timestamp', count='count', weekday='weekday', week_num='week_num', how='how', hour='hour', date='date'), cfg: STLConfig = STLConfig(period=28, robust=False)) DataFrame

Calculating outlier score for daily data

  • df_counter_daily: daily dataset for a single counter

  • cols: Column config

  • cfg: STL config

  • DataFrame of single counter with aggregate daily observations, and corresponding outlier scores

  • To avoid assigning high scores to observations that are prevalent, the outlier score is increased for quantile 99, and is decreased for observations between 25th and 75th quantiles

mobts.preprocessing.outliers._calculate_outlier_score_counter_hourly(df_couter_hourly: DataFrame, cols: ColumnsConfig = ColumnsConfig(counter='counter', timestamp='timestamp', count='count', weekday='weekday', week_num='week_num', how='how', hour='hour', date='date')) DataFrame

Calculating outlier score for hourly data

  • df_counter_hourly: hourly dataset for a single counter

  • cols: Column config

  • DataFrame of single counter with aggregate hourly observations, and corresponding outlier scores

STL is computationally heavy on hourly data, with a period of 168 hours. Furthermore,

  • To avoid assigning high scores to observations that are prevalent, the outlier score is increased for quantile 99, and is decreased for observations between 25th and 75th quantiles

mobts.preprocessing.pipeline module

The pipeline for preprocessing subpackage

This module contains: - the function for the first stage of preprocessing, which includes:

  • standardizing the input given by the user

  • removing observations with undefined counter names (optional)

  • aggregate hourly to daily frequency (optional)

  • remove counters with sparse observations

  • calculate outlier scores for each observation

  • the function for applying user’s thresold, flagging outliers, and replacing them with NaN

  • the ‘preprocess’ class containing a .run function that contains:
    • running the stage 1 procedure explained above

    • flagging and replacing outliers with NaN using given threshold

    • the plot function for visualizing detected outliers based on given threshold

It is worth noting that the main class is supposed to be called, however, a warning is given to user that the default thresholds are set based on the study case data, and might not be suitable for other datasets. The warning suggests the user to run ‘run_preprocess_stage_1’, and then tweak the threshold while monitoring the outlier detection quality via the ‘plot_outliers’ function. The final dataset can be directly produced after, using the ‘apply_threshold’ function.

mobts.preprocessing.pipeline.apply_threshold(df_scored: DataFrame, data_is_hourly: str, change_to_daily: str, cfg: PipelineConfig = PipelineConfig(cols=ColumnsConfig(counter='counter', timestamp='timestamp', count='count', weekday='weekday', week_num='week_num', how='how', hour='hour', date='date'), sparse=SparsityConfig(drop_sparse_counters=True, sparse_threshold=0.5), preprocess=PreprocessConfig(low_rel_daily=0.01, low_abs_daily=5, low_run_min_daily=2, zero_rate_max=0.05, zero_run_min=6, island_max_len=6, surround_min_len=12), stl=STLConfig(period=28, robust=False), outliers=OutlierConfig(threshold_daily=20, threshold_hourly=45), plot=PlotConfig(ncols=3, figsize_width=15, min_fig_height=10, height_per_row=3, linewidth_d=0.5, linewidth_h=0.3, marker_size=10, x_label_rotation=30, max_stations=None)), threshold: float | None = None) DataFrame

Applying the given threshold and replacing the outliers withy NaN

  • df_scored: DataFrame with outlier scores

  • data_is_hourly: indicator of if data is hourly or not (daily otherwise)

  • change_to_daily: indicator of if we are aggregating hourly to daily data

  • cfg: pipeline config containing all configs

  • threshold: outlier threshold set by user (defaults still set in config)

  • Cleaned final dataset with measurement errors removed and replaced by NaN

class mobts.preprocessing.pipeline.preprocess(cfg: PipelineConfig = PipelineConfig(cols=ColumnsConfig(counter='counter', timestamp='timestamp', count='count', weekday='weekday', week_num='week_num', how='how', hour='hour', date='date'), sparse=SparsityConfig(drop_sparse_counters=True, sparse_threshold=0.5), preprocess=PreprocessConfig(low_rel_daily=0.01, low_abs_daily=5, low_run_min_daily=2, zero_rate_max=0.05, zero_run_min=6, island_max_len=6, surround_min_len=12), stl=STLConfig(period=28, robust=False), outliers=OutlierConfig(threshold_daily=20, threshold_hourly=45), plot=PlotConfig(ncols=3, figsize_width=15, min_fig_height=10, height_per_row=3, linewidth_d=0.5, linewidth_h=0.3, marker_size=10, x_label_rotation=30, max_stations=None)))

Bases: object

This class consists of: + running the stage 1 procedure explained above + flagging and replacing outliers with NaN using given threshold

  • df_raw: raw data provided by user

  • data_is_hourly: indicator of if data is hourly or not (daily otherwise)

  • change_to_daily: indicator of if we are aggregating hourly to daily data

  • counter_col: counter column’s name fed by user

  • timestamp_col: timestamp column’s name fed by user

  • count_col: count column’s name fed by user

  • threshold: outlier threshold set by user (defaults still set in config)

  • cfg: pipeline config containing all configs

  • Cleaned final dataset with measurement errors removed and replaced by NaN

  • Here, the function is run either by the default thresholds, or a threshold PREVIOUSLY OPTIMIZED by the user

plot_outliers(threshold: float = None, counters: Iterable[str] | None = None, max_counters: int = None)

Plotting the outliers flagged for the counters

  • df_scored: DataFrame with outlier scores

  • data_is_hourly: indicator of if data is hourly or not (daily otherwise)

  • change_to_daily: indicator of if we are aggregating hourly to daily data

  • threshold: outlier threshold set by user (defaults still set in config)

  • counters: optional list of counters to be visualized

  • max_counters: maximum number of counters to be plotted, also optional

  • cfg: pipeline config containing all configs

  • Figure visualizing time-series of counts for each counter, highlighting the outliers set by the given threshold

report(print_output: bool = True, save: bool = False, filepath: str = 'preprocess_report.txt') dict

Returns a dictionary containing summary information from the latest run.

  • print_output : boolean for printing the operation info

  • save : boolean for saving the info in a text file

  • filepath : Path of the text file to save, default=”preprocess_report.txt”

  • Dictionary with summary information from the latest pipeline run.

run(df_raw: DataFrame, counter_col: str, timestamp_col: str, count_col: str, data_is_hourly: bool = True, change_to_daily: bool = False, threshold: float | None = None, metadata_cols: list | None = None) DataFrame
mobts.preprocessing.pipeline.run_preprocess_stage_1(df_raw: DataFrame, counter_col: str, timestamp_col: str, count_col: str, cfg: PipelineConfig = PipelineConfig(cols=ColumnsConfig(counter='counter', timestamp='timestamp', count='count', weekday='weekday', week_num='week_num', how='how', hour='hour', date='date'), sparse=SparsityConfig(drop_sparse_counters=True, sparse_threshold=0.5), preprocess=PreprocessConfig(low_rel_daily=0.01, low_abs_daily=5, low_run_min_daily=2, zero_rate_max=0.05, zero_run_min=6, island_max_len=6, surround_min_len=12), stl=STLConfig(period=28, robust=False), outliers=OutlierConfig(threshold_daily=20, threshold_hourly=45), plot=PlotConfig(ncols=3, figsize_width=15, min_fig_height=10, height_per_row=3, linewidth_d=0.5, linewidth_h=0.3, marker_size=10, x_label_rotation=30, max_stations=None)), data_is_hourly: bool = True, change_to_daily: bool = False) DataFrame

First stage of preprocessing, from raw data to outlier score

  • df_raw: raw data provided by user

  • counter_col: counter column’s name fed by user

  • timestamp_col: timestamp column’s name fed by user

  • count_col: count column’s name fed by user

  • cfg: pipeline config containing all configs

  • data_is_hourly: indicator of if data is hourly or not (daily otherwise)

  • change_to_daily: indicator of if we are aggregating hourly to daily data

  • Figure visualizing time-series of counts for each counter, highlighting the outliers set by the given threshold

mobts.preprocessing.plotting module

The plotting functions for checking the quality of outlier detection

This module contains: - the plot function for hourly data - the plot function for hourly data

mobts.preprocessing.plotting._plot_outliers_daily(df_scored: DataFrame, threshold: float, counters: Iterable[str] | None = None, max_counters: float = None, cols: ColumnsConfig = ColumnsConfig(counter='counter', timestamp='timestamp', count='count', weekday='weekday', week_num='week_num', how='how', hour='hour', date='date'), out_cfg: OutlierConfig = OutlierConfig(threshold_daily=20, threshold_hourly=45), plot_cfg: PlotConfig = PlotConfig(ncols=3, figsize_width=15, min_fig_height=10, height_per_row=3, linewidth_d=0.5, linewidth_h=0.3, marker_size=10, x_label_rotation=30, max_stations=None))

Plotting outliers for daily data

  • df_scored: daily dataset with outlier score

  • threshold: outlier threshold set by user (has a default of 20)

  • counters: optional list of counters to be visualized

  • max_counters: maximum number of counters to be plotted, also optional

  • cols: Column config

  • out_cfg: outlier config

  • plot_cfg: plotting config

  • Figure visualizing time-series of counts for each counter, highlighting the outliers set by the given threshold

mobts.preprocessing.plotting._plot_outliers_hourly(df_scored: DataFrame, threshold: float, counters: Iterable[str] | None = None, max_counters: float = None, cols: ColumnsConfig = ColumnsConfig(counter='counter', timestamp='timestamp', count='count', weekday='weekday', week_num='week_num', how='how', hour='hour', date='date'), out_cfg: OutlierConfig = OutlierConfig(threshold_daily=20, threshold_hourly=45), plot_cfg: PlotConfig = PlotConfig(ncols=3, figsize_width=15, min_fig_height=10, height_per_row=3, linewidth_d=0.5, linewidth_h=0.3, marker_size=10, x_label_rotation=30, max_stations=None))

Plotting outliers for hourly data

  • df_scored: daily dataset with outlier score

  • threshold: outlier threshold set by user (has a default of 20)

  • counters: optional list of counters to be visualized

  • max_counters: maximum number of counters to be plotted, also optional

  • cols: Column config

  • out_cfg: outlier config

  • plot_cfg: plotting config

  • Figure visualizing time-series of counts for each counter, highlighting the outliers set by the given threshold

Module contents