```
# 3. Lets define some use-case specific UDF(User Defined Functions)
def moving_average(data, window_size):
""" Computes moving average using discrete linear convolution of two one dimensional sequences.
Args:
-----
data (pandas.Series): independent variable
window_size (int): rolling window size
Returns:
--------
ndarray of linear convolution
References:
------------
[1] Wikipedia, "Convolution", http://en.wikipedia.org/wiki/Convolution.
[2] API Reference: https://docs.scipy.org/doc/numpy/reference/generated/numpy.convolve.html
"""
window = np.ones(int(window_size))/float(window_size)
return np.convolve(data, window, 'same')
def explain_anomalies(y, window_size, sigma=1.0):
""" Helps in exploring the anamolies using stationary standard deviation
Args:
-----
y (pandas.Series): independent variable
window_size (int): rolling window size
sigma (int): value for standard deviation
Returns:
--------
a dict (dict of 'standard_deviation': int, 'anomalies_dict': (index: value))
containing information about the points indentified as anomalies
"""
avg = moving_average(y, window_size).tolist()
residual = y - avg
# Calculate the variation in the distribution of the residual
std = np.std(residual)
return {'standard_deviation': round(std, 3),
'anomalies_dict': collections.OrderedDict([(index, y_i) for
index, y_i, avg_i in izip(count(), y, avg)
if (y_i > avg_i + (sigma*std)) | (y_i < avg_i - (sigma*std))])}
def explain_anomalies_rolling_std(y, window_size, sigma=1.0):
""" Helps in exploring the anamolies using rolling standard deviation
Args:
-----
y (pandas.Series): independent variable
window_size (int): rolling window size
sigma (int): value for standard deviation
Returns:
--------
a dict (dict of 'standard_deviation': int, 'anomalies_dict': (index: value))
containing information about the points indentified as anomalies
"""
avg = moving_average(y, window_size)
avg_list = avg.tolist()
residual = y - avg
# Calculate the variation in the distribution of the residual
testing_std = pd.rolling_std(residual, window_size)
testing_std_as_df = pd.DataFrame(testing_std)
rolling_std = testing_std_as_df.replace(np.nan,
testing_std_as_df.ix[window_size - 1]).round(3).iloc[:,0].tolist()
std = np.std(residual)
return {'stationary standard_deviation': round(std, 3),
'anomalies_dict': collections.OrderedDict([(index, y_i)
for index, y_i, avg_i, rs_i in izip(count(),
y, avg_list, rolling_std)
if (y_i > avg_i + (sigma * rs_i)) | (y_i < avg_i - (sigma * rs_i))])}
# This function is repsonsible for displaying how the function performs on the given dataset.
def plot_results(x, y, window_size, sigma_value=1,
text_xlabel="X Axis", text_ylabel="Y Axis", applying_rolling_std=False):
""" Helps in generating the plot and flagging the anamolies.
Supports both moving and stationary standard deviation. Use the 'applying_rolling_std' to switch
between the two.
Args:
-----
x (pandas.Series): dependent variable
y (pandas.Series): independent variable
window_size (int): rolling window size
sigma_value (int): value for standard deviation
text_xlabel (str): label for annotating the X Axis
text_ylabel (str): label for annotatin the Y Axis
applying_rolling_std (boolean): True/False for using rolling vs stationary standard deviation
"""
plt.figure(figsize=(15, 8))
plt.plot(x, y, "k.")
y_av = moving_average(y, window_size)
plt.plot(x, y_av, color='green')
plt.xlim(0, 1000)
plt.xlabel(text_xlabel)
plt.ylabel(text_ylabel)
# Query for the anomalies and plot the same
events = {}
if applying_rolling_std:
events = explain_anomalies_rolling_std(y, window_size=window_size, sigma=sigma_value)
else:
events = explain_anomalies(y, window_size=window_size, sigma=sigma_value)
x_anomaly = np.fromiter(events['anomalies_dict'].iterkeys(), dtype=int, count=len(events['anomalies_dict']))
y_anomaly = np.fromiter(events['anomalies_dict'].itervalues(), dtype=float,
count=len(events['anomalies_dict']))
plt.plot(x_anomaly, y_anomaly, "r*", markersize=12)
# add grid and lines and enable the plot
plt.grid(True)
plt.show()
```

```
# 4. Lets play with the functions
x = data_as_frame['Months']
Y = data_as_frame['SunSpots']
# plot the results
plot_results(x, y=Y, window_size=10, text_xlabel="Months", sigma_value=3,
text_ylabel="No. of Sun spots")
events = explain_anomalies(y, window_size=5, sigma=3)
# Display the anomaly dict
print("Information about the anomalies model:{}".format(events))
```