mobts.preprocessing package
Submodules
mobts.preprocessing.cleaning module
Cleaning the dataset for further operations
This module contains: - transforming the hourly dataset to daily dataset - removing measurement errors for data with hourly frequency per counter - removing measurement errors for data with hourly frequency per counter - a wrapper for removing measurement errors per counter - function for removing measurement errors for the entire network
- mobts.preprocessing.cleaning._aggregate_hourly_to_daily(df: DataFrame, cols: ColumnsConfig = ColumnsConfig(counter='counter', timestamp='timestamp', count='count', weekday='weekday', week_num='week_num', how='how', hour='hour', date='date')) DataFrame
Aggregate hourly frequency to daily frequency
df: hourly dataset
cols: Column config
DataFrame with daily frequency
- mobts.preprocessing.cleaning._remove_measurement_errors_all_network(df_network: DataFrame, data_is_hourly: bool = True, change_to_daily: bool = False, cols: ColumnsConfig = ColumnsConfig(counter='counter', timestamp='timestamp', count='count', weekday='weekday', week_num='week_num', how='how', hour='hour', date='date'), cfg: PreprocessConfig = PreprocessConfig(low_rel_daily=0.01, low_abs_daily=5, low_run_min_daily=2, zero_rate_max=0.05, zero_run_min=6, island_max_len=6, surround_min_len=12)) DataFrame
Applying the cleaning function on the entire network
df_network: the DataFrame containing all counters
data_is_hourly: indicator of if data is hourly or not (daily otherwise)
change_to_daily: indicator of if we are aggregating hourly to daily data
cols: column config
cfg: config for preprocessing
Single counter DataFrame where counts suspcious of being made with measurement errors are set to NaN
number of observations that have been changed to NaN by the function
number of counters that have been affected by the cleaning function
- mobts.preprocessing.cleaning._remove_measurement_errors_daily(df_counter: DataFrame, cols: ColumnsConfig = ColumnsConfig(counter='counter', timestamp='timestamp', count='count', weekday='weekday', week_num='week_num', how='how', hour='hour', date='date'), cfg: PreprocessConfig = PreprocessConfig(low_rel_daily=0.01, low_abs_daily=5, low_run_min_daily=2, zero_rate_max=0.05, zero_run_min=6, island_max_len=6, surround_min_len=12)) DataFrame
Remove measurement errors for data with daily frequency per counter
df_counter: daily dataset of a single counter
cols: column config
cfg: config for preprocessing
Single counter DataFrame where daily counts suspcious of being made with measurement errors are set to NaN
- The process includes removing counts in following conditions:
days recorded with 0 observations are set to NaN
a threshold is calculated for low-observation noise. this threshold is the maximum of a pre-set ‘low_abs_daily’ counts, and ‘low_rel_daily’ (in %) of a baseline that is defined as the median of all counts. observations under this threshold are considered as “low counts”. if these low count observations persist for longer than ‘low_run_min_daily’, they will be set to NaN as well.
Considering the aggregate nature of daily data, compared to hourly data, the removal of 0s and low counts are more generously applied.
- mobts.preprocessing.cleaning._remove_measurement_errors_hourly(df_counter: DataFrame, cols: ColumnsConfig = ColumnsConfig(counter='counter', timestamp='timestamp', count='count', weekday='weekday', week_num='week_num', how='how', hour='hour', date='date'), cfg: PreprocessConfig = PreprocessConfig(low_rel_daily=0.01, low_abs_daily=5, low_run_min_daily=2, zero_rate_max=0.05, zero_run_min=6, island_max_len=6, surround_min_len=12)) DataFrame
Remove measurement errors for data with hourly frequency per counter
df_counter: hourly dataset of a single counter
cols: Column config
cfg: config for preprocessing
Single counter DataFrame where hourly counts suspcious of being made with measurement errors are set to NaN
- The process includes removing counts in following conditions:
if a counter has recorded 0 counts for en entire day, the entire day will be set to NaN.
for an hour’s count to be set to zero, it has to be outside night hours, and the rate of 0 observations for that hour needs to be lower than ‘zero_rate_max’. additionally, the median observation of the hour has to be at least one standard deviation higher than zero.
after the conditions mentioned above (2), only 0 records that are consequent for ‘zero_run_min’ hours are set to NaN.
observation islands with a duration lower than ‘island_max_len’ that are surrounded by zero or NaN observations lengthier than ‘surround_min_len’ are also set to NaN.
Considering noise and variations of hourly data, strict conditions are set as explained above to prevent unnecessary loss of data.
- mobts.preprocessing.cleaning._remove_measurement_errors_wrapper_counter(df_counter: DataFrame, data_is_hourly: bool, change_to_daily: bool, cols: ColumnsConfig = ColumnsConfig(counter='counter', timestamp='timestamp', count='count', weekday='weekday', week_num='week_num', how='how', hour='hour', date='date'), cfg: PreprocessConfig = PreprocessConfig(low_rel_daily=0.01, low_abs_daily=5, low_run_min_daily=2, zero_rate_max=0.05, zero_run_min=6, island_max_len=6, surround_min_len=12)) DataFrame
Wrapper for choosing the measurement error remover by frequency for a single counter
df_counter: daily dataset of a single counter
data_is_hourly: indicator of if data is hourly or not (daily otherwise)
change_to_daily: indicator of if we are aggregating hourly to daily data
cols: column config
cfg: config for preprocessing
Single counter DataFrame where counts suspcious of being made with measurement errors are set to NaN
mobts.preprocessing.outliers module
Calculating and assigning an outlier score to each observation
This module contains: - calculating outlier score for daily data - calculating outlier score for hourly data - applying outlier score to all counters in the network
- mobts.preprocessing.outliers._calculate_outlier_score(df: DataFrame, data_is_hourly: bool = True, change_to_daily: bool = False, cols: ColumnsConfig = ColumnsConfig(counter='counter', timestamp='timestamp', count='count', weekday='weekday', week_num='week_num', how='how', hour='hour', date='date'), stl_cfg: STLConfig = STLConfig(period=28, robust=False)) DataFrame
Calculating outlier score for each observation based on temporal frequency
df: complete dataset
data_is_hourly: indicator of if data is hourly or not (daily otherwise)
change_to_daily: indicator of if we are aggregating hourly to daily data
cols: Column config
stl_cfg: config for STL
DataFrame of all observations with their corresponding outlier score
the choice of outlier threshold downstream in the code
- mobts.preprocessing.outliers._calculate_outlier_score_counter_daily(df_counter_daily: DataFrame, cols: ColumnsConfig = ColumnsConfig(counter='counter', timestamp='timestamp', count='count', weekday='weekday', week_num='week_num', how='how', hour='hour', date='date'), cfg: STLConfig = STLConfig(period=28, robust=False)) DataFrame
Calculating outlier score for daily data
df_counter_daily: daily dataset for a single counter
cols: Column config
cfg: STL config
DataFrame of single counter with aggregate daily observations, and corresponding outlier scores
To avoid assigning high scores to observations that are prevalent, the outlier score is increased for quantile 99, and is decreased for observations between 25th and 75th quantiles
- mobts.preprocessing.outliers._calculate_outlier_score_counter_hourly(df_couter_hourly: DataFrame, cols: ColumnsConfig = ColumnsConfig(counter='counter', timestamp='timestamp', count='count', weekday='weekday', week_num='week_num', how='how', hour='hour', date='date')) DataFrame
Calculating outlier score for hourly data
df_counter_hourly: hourly dataset for a single counter
cols: Column config
DataFrame of single counter with aggregate hourly observations, and corresponding outlier scores
STL is computationally heavy on hourly data, with a period of 168 hours. Furthermore,
To avoid assigning high scores to observations that are prevalent, the outlier score is increased for quantile 99, and is decreased for observations between 25th and 75th quantiles
mobts.preprocessing.pipeline module
The pipeline for preprocessing subpackage
This module contains: - the function for the first stage of preprocessing, which includes:
standardizing the input given by the user
removing observations with undefined counter names (optional)
aggregate hourly to daily frequency (optional)
remove counters with sparse observations
calculate outlier scores for each observation
the function for applying user’s thresold, flagging outliers, and replacing them with NaN
- the ‘preprocess’ class containing a .run function that contains:
running the stage 1 procedure explained above
flagging and replacing outliers with NaN using given threshold
the plot function for visualizing detected outliers based on given threshold
It is worth noting that the main class is supposed to be called, however, a warning is given to user that the default thresholds are set based on the study case data, and might not be suitable for other datasets. The warning suggests the user to run ‘run_preprocess_stage_1’, and then tweak the threshold while monitoring the outlier detection quality via the ‘plot_outliers’ function. The final dataset can be directly produced after, using the ‘apply_threshold’ function.
- mobts.preprocessing.pipeline.apply_threshold(df_scored: DataFrame, data_is_hourly: str, change_to_daily: str, cfg: PipelineConfig = PipelineConfig(cols=ColumnsConfig(counter='counter', timestamp='timestamp', count='count', weekday='weekday', week_num='week_num', how='how', hour='hour', date='date'), sparse=SparsityConfig(drop_sparse_counters=True, sparse_threshold=0.5), preprocess=PreprocessConfig(low_rel_daily=0.01, low_abs_daily=5, low_run_min_daily=2, zero_rate_max=0.05, zero_run_min=6, island_max_len=6, surround_min_len=12), stl=STLConfig(period=28, robust=False), outliers=OutlierConfig(threshold_daily=20, threshold_hourly=45), plot=PlotConfig(ncols=3, figsize_width=15, min_fig_height=10, height_per_row=3, linewidth_d=0.5, linewidth_h=0.3, marker_size=10, x_label_rotation=30, max_stations=None)), threshold: float | None = None) DataFrame
Applying the given threshold and replacing the outliers withy NaN
df_scored: DataFrame with outlier scores
data_is_hourly: indicator of if data is hourly or not (daily otherwise)
change_to_daily: indicator of if we are aggregating hourly to daily data
cfg: pipeline config containing all configs
threshold: outlier threshold set by user (defaults still set in config)
Cleaned final dataset with measurement errors removed and replaced by NaN
- class mobts.preprocessing.pipeline.preprocess(cfg: PipelineConfig = PipelineConfig(cols=ColumnsConfig(counter='counter', timestamp='timestamp', count='count', weekday='weekday', week_num='week_num', how='how', hour='hour', date='date'), sparse=SparsityConfig(drop_sparse_counters=True, sparse_threshold=0.5), preprocess=PreprocessConfig(low_rel_daily=0.01, low_abs_daily=5, low_run_min_daily=2, zero_rate_max=0.05, zero_run_min=6, island_max_len=6, surround_min_len=12), stl=STLConfig(period=28, robust=False), outliers=OutlierConfig(threshold_daily=20, threshold_hourly=45), plot=PlotConfig(ncols=3, figsize_width=15, min_fig_height=10, height_per_row=3, linewidth_d=0.5, linewidth_h=0.3, marker_size=10, x_label_rotation=30, max_stations=None)))
Bases:
objectThis class consists of: + running the stage 1 procedure explained above + flagging and replacing outliers with NaN using given threshold
df_raw: raw data provided by user
data_is_hourly: indicator of if data is hourly or not (daily otherwise)
change_to_daily: indicator of if we are aggregating hourly to daily data
counter_col: counter column’s name fed by user
timestamp_col: timestamp column’s name fed by user
count_col: count column’s name fed by user
threshold: outlier threshold set by user (defaults still set in config)
cfg: pipeline config containing all configs
Cleaned final dataset with measurement errors removed and replaced by NaN
Here, the function is run either by the default thresholds, or a threshold PREVIOUSLY OPTIMIZED by the user
- plot_outliers(threshold: float = None, counters: Iterable[str] | None = None, max_counters: int = None)
Plotting the outliers flagged for the counters
df_scored: DataFrame with outlier scores
data_is_hourly: indicator of if data is hourly or not (daily otherwise)
change_to_daily: indicator of if we are aggregating hourly to daily data
threshold: outlier threshold set by user (defaults still set in config)
counters: optional list of counters to be visualized
max_counters: maximum number of counters to be plotted, also optional
cfg: pipeline config containing all configs
Figure visualizing time-series of counts for each counter, highlighting the outliers set by the given threshold
- report(print_output: bool = True, save: bool = False, filepath: str = 'preprocess_report.txt') dict
Returns a dictionary containing summary information from the latest run.
print_output : boolean for printing the operation info
save : boolean for saving the info in a text file
filepath : Path of the text file to save, default=”preprocess_report.txt”
Dictionary with summary information from the latest pipeline run.
- run(df_raw: DataFrame, counter_col: str, timestamp_col: str, count_col: str, data_is_hourly: bool = True, change_to_daily: bool = False, threshold: float | None = None, metadata_cols: list | None = None) DataFrame
- mobts.preprocessing.pipeline.run_preprocess_stage_1(df_raw: DataFrame, counter_col: str, timestamp_col: str, count_col: str, cfg: PipelineConfig = PipelineConfig(cols=ColumnsConfig(counter='counter', timestamp='timestamp', count='count', weekday='weekday', week_num='week_num', how='how', hour='hour', date='date'), sparse=SparsityConfig(drop_sparse_counters=True, sparse_threshold=0.5), preprocess=PreprocessConfig(low_rel_daily=0.01, low_abs_daily=5, low_run_min_daily=2, zero_rate_max=0.05, zero_run_min=6, island_max_len=6, surround_min_len=12), stl=STLConfig(period=28, robust=False), outliers=OutlierConfig(threshold_daily=20, threshold_hourly=45), plot=PlotConfig(ncols=3, figsize_width=15, min_fig_height=10, height_per_row=3, linewidth_d=0.5, linewidth_h=0.3, marker_size=10, x_label_rotation=30, max_stations=None)), data_is_hourly: bool = True, change_to_daily: bool = False) DataFrame
First stage of preprocessing, from raw data to outlier score
df_raw: raw data provided by user
counter_col: counter column’s name fed by user
timestamp_col: timestamp column’s name fed by user
count_col: count column’s name fed by user
cfg: pipeline config containing all configs
data_is_hourly: indicator of if data is hourly or not (daily otherwise)
change_to_daily: indicator of if we are aggregating hourly to daily data
Figure visualizing time-series of counts for each counter, highlighting the outliers set by the given threshold
mobts.preprocessing.plotting module
The plotting functions for checking the quality of outlier detection
This module contains: - the plot function for hourly data - the plot function for hourly data
- mobts.preprocessing.plotting._plot_outliers_daily(df_scored: DataFrame, threshold: float, counters: Iterable[str] | None = None, max_counters: float = None, cols: ColumnsConfig = ColumnsConfig(counter='counter', timestamp='timestamp', count='count', weekday='weekday', week_num='week_num', how='how', hour='hour', date='date'), out_cfg: OutlierConfig = OutlierConfig(threshold_daily=20, threshold_hourly=45), plot_cfg: PlotConfig = PlotConfig(ncols=3, figsize_width=15, min_fig_height=10, height_per_row=3, linewidth_d=0.5, linewidth_h=0.3, marker_size=10, x_label_rotation=30, max_stations=None))
Plotting outliers for daily data
df_scored: daily dataset with outlier score
threshold: outlier threshold set by user (has a default of 20)
counters: optional list of counters to be visualized
max_counters: maximum number of counters to be plotted, also optional
cols: Column config
out_cfg: outlier config
plot_cfg: plotting config
Figure visualizing time-series of counts for each counter, highlighting the outliers set by the given threshold
- mobts.preprocessing.plotting._plot_outliers_hourly(df_scored: DataFrame, threshold: float, counters: Iterable[str] | None = None, max_counters: float = None, cols: ColumnsConfig = ColumnsConfig(counter='counter', timestamp='timestamp', count='count', weekday='weekday', week_num='week_num', how='how', hour='hour', date='date'), out_cfg: OutlierConfig = OutlierConfig(threshold_daily=20, threshold_hourly=45), plot_cfg: PlotConfig = PlotConfig(ncols=3, figsize_width=15, min_fig_height=10, height_per_row=3, linewidth_d=0.5, linewidth_h=0.3, marker_size=10, x_label_rotation=30, max_stations=None))
Plotting outliers for hourly data
df_scored: daily dataset with outlier score
threshold: outlier threshold set by user (has a default of 20)
counters: optional list of counters to be visualized
max_counters: maximum number of counters to be plotted, also optional
cols: Column config
out_cfg: outlier config
plot_cfg: plotting config
Figure visualizing time-series of counts for each counter, highlighting the outliers set by the given threshold