pandas
”Title | Grouping, Rolling, and Expanding in pandas |
Topic | Grouping (.groupby ) and Window (.rolling , .expanding ) Functions in pandas |
Date | Fri Nov % |
Keywords | {DataFrame,Series}.groupby , .groupby.agg , .groupby.apply , .groupby.transform , .rolling , .expanding |
These sessions are designed for a broad audience of non-software engineers and software programmers of all backgrounds and skill-levels.
Our expected audience should comprise attendees with a…
pandas
pandas.{Series,DataFrame}.groupby
and grouping operationsDuring this session, we will endeavour to guide our audience to developing…
.groupby
, .expanding
, .rolling
, and .ewm
, including use with time series data and use of the corresponding Rolling
, Window
, Expanding
, and ExponentialMovingWindow
objects.groupby(…).transform(lambda df: df.rolling(…))
In a previous seminar, we looked at the pandas.DataFrame
and how it provides
a data type for manipulating liked-indexed columnar datasets.
In this seminar, we will look at .groupby
in depth, as well as the various
reduction operations it supports (.apply
, .transform
, .aggregate
.) We
will explore .groupby
in the context of other window functions in
pandas
—functions which operate on “windows” of multiple rows or multiple
columns to perform aggregations or other transformations.
We’ll also look at .rolling
, .expanding
, and .ewm
, their various options
and modalities, as well as the the operations available on the Window
,
Rolling
, Expanding
, and ExponentialMovingWindow
objects they return.
We’ll discuss these operations in the context of time series analysis and
discuss performance considerations related to the use of each.
Agenda:
.groupby
.groupby.agg
.groupby.apply
.groupby.transform
.rolling
.expanding
.ewm
Did you enjoy this seminar? Did you learn something new that will help you as
you use pandas
and window functions more and more in your work?
In a future seminar, we may dive deeper into the use of pandas
.
We can:
.rolling
with .tseries.offsets
and other time series considerations in greater depth.numba
, cython
or other tools to address performance issues with non-native window operations.If you’re interested in any of these topics, please let us know! Send us an e-mail at learning@dutc.io or contact us over Workplace with your feedback!
print("Let's go!")
print("Let's go!")
print("Let's go!")
Let’s start with a pandas.DataFrame
with some random data:
# data.py
from pandas import DataFrame, date_range, to_timedelta, Categorical
from numpy import repeat, array, tile
from random import randrange
from string import ascii_lowercase
from matplotlib.pyplot import plot, show
from numpy.random import default_rng
rng = default_rng(0)
datacenters = ['ABC0', 'XYZ1']
devices = rng.choice([*ascii_lowercase], size=(5, 4)).view('<U4').ravel()
times = date_range('2020-01-01 9:00', periods=(num_periods := 5), freq='1H')
df = DataFrame({
'datacenter' : repeat(datacenters, len(devices) * len(times)),
'device': tile(devices, len(datacenters) * len(times)),
'time': repeat(tile(times, len(datacenters)), len(devices))
+ to_timedelta(rng.integers(0, 59, size=len(devices)*len(times)*len(datacenters)), unit='T'),
'temperature': tile(rng.normal(loc=70, scale=10, size=len(devices)), len(datacenters) * len(times))
+ rng.normal(size=(len(devices), len(datacenters), len(times))).cumsum(axis=-1).ravel(),
'traffic': rng.integers(100, 100_000, size=len(devices)*len(datacenters)*len(times)),
})
print(
# df,
df.sample(6),
)
.groupby
and the .index
Let’s quickly refresh what we know about the pandas
.index
and the .groupby
operation.
A pandas.array
is a 1-D dataset containing homogeneous data structured to
allow for efficient storage and processing (using the “restricted computation
domain” idea.) The elements of this dataset can be accessed by their position
(their “integer location.”)
A pandas.Series
is a pandas.array
combined with a pandas.Index
. The
elements of this object can be accessed by their position using the .iloc[]
API or by their “label” using the .loc[]
API.
A pandas.Index
is an object which provides a mechanism for converting “label”
values into “position” values. It implements the .get_loc
interface. It is
used by the .loc[]
API in a pandas.Series
and pandas.DataFrame
to allow
the contents of these datasets to be accessed by “label.”
A pandas.DataFrame
is a 2-D “flat” (typically taller-than-wide) dataset, with
multiple 1- or 2-D homogeneous datasets (accessible via ._data
) along with two indices:
.index
which allow selection of rows.columns
which allows selection of columnsNote that all operations performed between pandas.Series
or
pandas.DataFrame
objects are “index-aligned”. e.g., when performing s1 + s2
on two pandas.Series
objects, we first align the two datasets using their
.index
es. Note that a pandas.DataFrame
can thought of as a collection of
multiple 1-D datasets with a common .index
.
from pandas import Series
s1 = Series({'a': 1, 'b': 2, 'c': 3})
s2 = Series({ 'b': 2, 'c': 3, 'd': 4})
print(
s1,
s2,
# s1.iloc[0], s2.iloc[0],
# s1.loc['a'], s2.loc['b'],
# s1 + s2,
s1.add(s2, fill_value=0),
sep='\n',
)
from pandas import DataFrame, date_range
df1 = DataFrame({
'a': [0, 1, 2],
'b': [3, 4, 5],
}, index=date_range('2021-05-06', periods=3))
df2 = DataFrame({
'b': [6, 7, 8],
'c': [9, 10, 11],
}, index=date_range('2021-05-07', periods=3))
print(
# df1,
# df2,
# df1['a'],
# df1.loc['2021-05-06'],
# df1.iloc[[0, 1]],
# df1.index,
# df1.columns,
# df1 + df2,
# df1.add(df2, fill_value=0),
sep='\n',
)
The .groupby
operation allows us to perform computations on either horizontal
or vertical groupings of data.
from data import df, s
print(
# df.head(5),
# df.groupby('datacenter').mean().index,
# df.groupby('device').mean().index,
# df.groupby(['device', 'datacenter']).min(),
# df.groupby(['device', 'datacenter']).mean().unstack(),
# df.pivot_table(index='device', columns='datacenter', aggfunc='mean'),
# s,
# df.groupby(s.loc[df['device']].values).mean(),
# df.groupby(lambda idx: idx % 2).mean(),
sep='\n',
)
With .groupby
, we can also use the .apply
, .aggregate
, .transform
, and
.filter
methods. These largely fit into the following categories:
Let’s first look at .filter
:
from data import df
# for k, g in df.groupby('device'):
# print(f'{k}'.center(50, '-'))
# print(g)
# break
print(
# df.groupby('device'),
# df.groupby('device').mean(),
# df[df['device'].str.startswith('e')],
# df.groupby('device').filter(
# lambda g: (g['temperature'].max() - g['temperature'].min()) / g['temperature'].max() > 0.08
# # lambda g: print(g)
# ),
# df.set_index('device').sort_index().index.is_monotonic,
# df.set_index(['device', 'datacenter', 'time']).sort_index(),
# df.assign(time=df['time'].dt.floor('1H')).groupby(['device', 'time']).filter(
# lambda g: (g['temperature'].max() - g['temperature'].min()) / g['temperature'].max() > 0.04
# ),
# df.groupby(['device', df['time'].dt.floor('1H')]).filter(
# lambda g: (g['temperature'].max() - g['temperature'].min()) / g['temperature'].max() > 0.04
# ),
df.set_index(['datacenter', 'device', 'time'])
.pipe(lambda df:
df.groupby(['device', df.index.get_level_values('time').floor('1H')]).filter(
lambda g: (g['temperature'].max() - g['temperature'].min()) / g['temperature'].max() > 0.04
)
),
)
Let’s next look at .transform
:
In a transformation, the result must be “like-indexed.” The function using for
.transform
is also restricted in the following ways:
from data import df
df = df.set_index(['datacenter', 'device', 'time']).sort_index()
# abc0 xyz 9:00 100 1
# 10:00 99 .99
# 11:00 95 .4...
print(
df.sample(3),
df.groupby(['device', 'datacenter'])['temperature'].transform(
lambda s: (s / s.iloc[0])
),
# df.set_index(['datacenter', 'device']).sort_index()
# .groupby(['datacenter', 'device'])['temperature']
# .transform(
# lambda s: s / s.iloc[0]
# ),
)
from pandas import Series, Categorical
from data import df
s = Series({'ABC0': 1, 'XYZ1': 100})
s.index = Categorical(s.index)
s.index.name = 'datacenter'
df['datacenter'] = Categorical(df['datacenter'], categories=s.index.categories)
print(
# df.head(),
# df.set_index(['device', 'datacenter']).pipe(
# lambda df: df.assign(price=df['temperature'] * s)
# ),
)
If we want to perform an aggregation, we can use .aggregate
or .agg
.
.aggregate
works on a column-by-column basis.
from data import df
from scipy.stats import zscore
df = df.set_index(['datacenter', 'device', 'time']).sort_index()
print(
# df.groupby('device').mean(),
# df.groupby('device').agg('mean'),
# df.groupby('device').agg({'temperature': 'mean', 'traffic': ['max', 'min']}),
# df.groupby(['device', 'datacenter']).agg(
# lambda s: s.mean()
# ),
# df.groupby(['device', 'datacenter']).agg({'temperature':
# lambda s: s.mean()
# }),
# df.groupby(['device', 'datacenter']).agg({'temperature':
# lambda s: zscore(s)[-1]
# }),
)
Finally, we have another means by which we can do aggregation operations: .apply
.
.apply
is extremely flexible, but it has the downside of being much slower in
practice than .aggregate
or .transform
. .apply
takes a function which
accepts a DataFrame
as its argument, returning a new DataFrame
; the
.apply
machinery determines how to combine the result DataFrame
s into a new
structure.
.apply
operates on a window-by-window basis.
from data import df
from pandas import Series, Interval, concat
factors = Series({
Interval(0, 50_000, closed='left'): 1,
Interval(50_000, 100_000, closed='left'): 2,
})
df = df.set_index(['device', 'datacenter', 'time']).sort_index()
print(
# df.groupby(['device', 'datacenter']).apply(
# lambda df: (df['temperature'] * factors.loc[df['traffic']].values).mean()
# ),
# df.groupby(['device', 'datacenter']).apply(
# lambda df: concat([df, df])
# )
)
.rolling
and .expanding
and .ewm
Let’s take a look at a computation involving an expanding window:
from data import df
from itertools import islice
df = df.set_index(['device', 'datacenter', 'time']).sort_index()
print(
# df.expanding(),
# df.groupby('device'),
# df.expanding(min_periods=1),
# df.groupby(['device', 'datacenter'])['traffic'].transform(
# lambda s: s.expanding().mean()
# ).head(5),
df.groupby(['device', 'datacenter'])['temperature'].transform(
lambda s: s.expanding(min_periods=1).agg(
lambda s: s.diff().abs().sum()
)
),
# df.set_index(['device', 'datacenter']).groupby(['device', 'datacenter'])['temperature'].transform(
# lambda s: s.expanding(min_periods=1).agg(
# lambda s: s.diff().abs().sum()
# )
# )
#.dropna(),
)
for g in islice(
df['temperature'].expanding(min_periods=1),
3):
print(g.diff().abs().sum())
# for g in islice(
# df.groupby(['device', 'datacenter']).expanding(min_periods=1),
# None):
# print(g)
As an aside, let’s consider the sampling rate of our data:
from data import df
from pandas import date_range, concat
df = df.set_index(['device', 'datacenter', 'time']).sort_index()
print(
# df,
# df.resample('30T'),
df.groupby(['device', 'datacenter'])['temperature'].apply(
lambda g: g.droplevel(['device', 'datacenter']).resample('30T').mean()
).interpolate(method='linear'),
# df.set_index('time').resample('30T'),
# df.set_index('time').resample('30T').first(),
# df.set_index(['device', 'datacenter', 'time']).resample('30T').first(),
# df.set_index(['device', 'datacenter', 'time']).groupby(['device', 'datacenter']).apply(
# lambda df: df.reset_index(['device', 'datacenter'], drop=True).resample('30T').first()
# ).interpolate('linear'),
# df.set_index(['device', 'datacenter', 'time']).groupby(['device', 'datacenter']).apply(
# # lambda df: df.reset_index(['device', 'datacenter'], drop=True).resample('30T').apply(
# # lambda s: concat([s, s])
# # )
# lambda df: df.reset_index(['device', 'datacenter'], drop=True).resample('30T').aggregate(
# lambda s: s.mean()
# )
# ),
)
Let’s take a look at a computation involving a rolling window:
from data import big_df as df
from pandas import Grouper, to_datetime
from pandas.tseries.offsets import Day, CustomBusinessDay
cbd = CustomBusinessDay(holidays=to_datetime(['2020-01-07']))
df = df.set_index(['device', 'datacenter', 'time']).sort_index()
print(
# df.rolling(5, min_periods=1).mean(),
# df.groupby(['device', 'datacenter']).transform(
# lambda g: g.rolling(5, min_periods=1).mean()
# )
# df.groupby(['device', 'datacenter', Grouper(level='time', freq='D')]).mean(),
df.groupby(['device', 'datacenter'])['temperature'].transform(
lambda g: g.droplevel(['device', 'datacenter']).rolling(Day(3), min_periods=1).mean()
)
# df['time'].min(), df['time'].max(),
# df.sample(5),
# df.set_index(['device', 'datacenter', 'time'])
# .groupby(['device', 'datacenter', Grouper(level='time', freq='D')])['price'].last()
# .groupby(['device', 'datacenter']).transform(
# lambda df: df.rolling(7, min_periods=1).mean()
# ),
# df[
# (df['time'].dt.date != to_datetime('2021-05-31').date()) # Memorial Day
# & (df['time'].dt.weekday < 6) # Mon ~ Fri
# ],
# df[
# (df['time'].dt.date != to_datetime('2021-05-31').date()) # Memorial Day
# & (df['time'].dt.weekday < 6) # Mon ~ Fri
# ].set_index(['device', 'datacenter', 'time'])
# .groupby(['device', 'datacenter', Grouper(level='time', freq='D')])['temperature'].last()
# .groupby(['device', 'datacenter']).transform(
# lambda df: df.reset_index(['device', 'datacenter'], drop=True).rolling(Day(3), min_periods=1).mean()
# ),
# df.set_index(['device', 'datacenter', 'time'])
# # .groupby(['device', 'datacenter', Grouper(level='time', freq='D')])['temperature'].last()
# .groupby(['device', 'datacenter']).transform(
# lambda df: df.droplevel(['device', 'datacenter']).rolling(cbd, min_periods=1).mean()
# ),
)
Consider that any rolling-window operation can be conceived of as a convolution integral with a square “kernel.”
from data import df
from numpy import ones, convolve
for _, grp in df.groupby(['device', 'datacenter']):
break
print(
# grp['temperature'],
# grp['temperature'].rolling(3).mean().dropna(),
# convolve(ones(3) / 3, grp['temperature'])[2:-2],
)
What about non-square “kernels”?
from scipy.signal.windows import triang, gaussian
from numpy import convolve
from data import df
for _, grp in df.groupby(['device', 'datacenter']):
break
print(
# triang(3),
# gaussian(3, std=1),
convolve(grp['temperature'], triang(3)),
)
from data import df
for _, grp in df.groupby(['device', 'datacenter']):
break
print(
grp['temperature'].rolling(3, win_type='triang', min_periods=1).mean(),
)
from pandas import Series
from pandas.api.indexers import FixedForwardWindowIndexer
s = Series(range(10))
window = FixedForwardWindowIndexer(window_size=3)
print(
# window.get_window_bounds(s.size),
# s.rolling(3).mean(),
s.rolling(window).mean(),
)
from pandas.api.indexers import BaseIndexer
from functools import wraps
from numpy import arange
from pandas import Series
class Window(BaseIndexer):
@wraps(BaseIndexer.get_window_bounds)
def get_window_bounds(self, num_values, *_, **__):
start = arange(num_values)
return start, (2 ** start.astype('O')).clip(0, num_values).astype(int)
s = Series(range(100))
window = Window()
print(
# window.get_window_bounds(s.size),
s.rolling(window).mean(),
)
from pandas import to_datetime, date_range, Series, Grouper
from numpy import arange, array
from pandas.tseries.offsets import CustomBusinessDay
from pandas.api.indexers import VariableOffsetWindowIndexer
from dataclasses import dataclass
from functools import wraps
from data import big_df as df
@dataclass
class Window(VariableOffsetWindowIndexer):
index_array : None
window_size : None
cbd = CustomBusinessDay(holidays=to_datetime(['2021-01-07']))
@wraps(VariableOffsetWindowIndexer.get_window_bounds)
def get_window_bounds(self, num_values, min_periods=None, *_, **__):
indices = Series(arange(num_values), index=self.index_array)
start = (
indices.groupby(Grouper(freq=self.cbd)).transform(lambda s: s[0])
.rolling(self.window_size, min_periods=min_periods).agg(lambda s: s[0]).fillna(0).astype(int)
)
stop = (
indices.groupby(Grouper(freq=self.cbd)).transform(lambda s: s[-1])
.rolling(self.window_size, min_periods=min_periods).agg(lambda s: s[-1]).fillna(0).astype(int)
)
return start.values, stop.values
for _, grp in df.groupby(['device', 'datacenter']):
break
window = Window(index_array=grp['time'], window_size=1)
print(
# window.get_window_bounds(grp['temperature'].size),
# grp['temperature'].rolling(window, min_periods=1).mean(),
)
from pandas import Series
from numpy import arange
s = Series(range(100))
alpha = .1
print(
# (s * (factors := (1 - alpha)**arange(s.size, 0, -1))).cumsum()
# / factors.cumsum(),
# s.ewm(alpha=alpha).mean(),
# s.expanding(min_periods=1).agg(
# lambda s: (s * (factors := (1 - alpha)**arange(s.size, 0, -1))).sum()
# / factors.sum()
# ),
)
from data import big_df as df
for _, grp in df.groupby(['device', 'datacenter']):
break
# alpha = 1 - exp(-ln(2) / halflife)
print(
# grp['temperature'].ewm(halflife='2 days', times=grp['time']).mean(),
# grp.set_index('time')['temperature'].pipe(
# lambda df: df.ewm(halflife='2 days', times=df.index).mean()
# ),
# df.set_index(['device', 'datacenter', 'time'])['temperature']
# .pipe(lambda df: df.ewm(halflife='3 days',
# times=df.index.get_level_values('time')).mean())
# .reset_index(),#.iloc[2 * 24 * 31 - 3: 2 * 24 * 31 + 3],
# df.iloc[2 * 24 * 31 - 3: 2 * 24 * 31 + 3],
# df.set_index(['device', 'datacenter', 'time'])['temperature']
# .groupby(['device', 'datacenter', 'time']).transform(
# lambda s: s.ewm(halflife='3 days', times=s.index.get_level_values('time')).mean()
# ).reset_index()#.iloc[2 * 24 * 31 - 3: 2 * 24 * 31 + 3],
)
Let’s talk a bit about performance.
from data import big_df as df, timed
from data import huge_df as df
with timed('df[column].apply(…)'):
print(
df['price'].apply(lambda x: x * 100),
# df['price'] * 100,
)
from data import big_df as df, timed
from data import huge_df as df
from pandas import Grouper
df = df.set_index(['device', 'datacenter', 'time']).sort_index()['temperature']
with timed('df.groupby(…).apply(…)'):
print(
# df.groupby(['device', 'datacenter']).mean(),
# df.groupby(['device', 'datacenter']).agg('mean'),
# df.groupby(['device', 'datacenter']).apply(lambda df: df.mean()),
# df.groupby(['device', 'datacenter', Grouper(level='time', freq='1D')]).apply(lambda df: df.mean()),
# df.groupby(['device', 'datacenter', Grouper(level='time', freq='1D')]).mean()
)
from data import big_df as df, timed
# from data import huge_df as df
from pandas import Series
from xarray import DataArray
from numpy import arange
df = df.set_index(['device', 'datacenter'])[['temperature']]
with timed('rolling & expanding'):
print(
# df.groupby(level=['device', 'datacenter']).transform(
# lambda s: s.rolling(3, min_periods=1).mean()
# ),
# df.groupby(level=['device', 'datacenter']).transform(
# lambda s: s.rolling(3, min_periods=1).apply(lambda g: g.mean())
# ),
# df.groupby(level=['device', 'datacenter']).transform(
# lambda s: s.expanding().apply(lambda g: g.mean())
# ),
# df.groupby(level=['device', 'datacenter']).transform(
# lambda s: s.expanding().mean()
# ),
# df.groupby(level=['device', 'datacenter']).transform(
# lambda s: s.expanding().apply(lambda g: g.mean(), raw=True)
# ),
# ((da := DataArray(df['temperature'].values.reshape(len(df.index.levels[0]), len(df.index.levels[1]), -1),
# dims=[*df.index.names, 'time']))
# .cumsum(dim='time') / arange(1, len(da.coords['time']) + 1)),
Series(((da := DataArray(df['temperature'].values.reshape(len(df.index.levels[0]), len(df.index.levels[1]), -1),
dims=[*df.index.names, 'time']))
.cumsum(dim='time') / arange(1, len(da.coords['time']) + 1)).data.ravel(),
index=df.index).sort_index(),
)
from numpy import arange
from numpy.lib.stride_tricks import as_strided
def rolling(arr, size):
return as_strided(arr, shape=(arr.shape[0] - size + 1, size, *arr.shape[1:]),
strides=(arr.strides[0], *arr.strides))
xs = arange(10)
ys = rolling(xs, size=3)
print(
# xs,
ys,
)
from data import big_df as df, timed
# from data import huge_df as df
from xarray import DataArray
from numpy.lib.stride_tricks import as_strided
def rolling(arr, size):
return as_strided(arr, shape=(*arr.shape[:-1], arr.shape[-1] - size + 1, size),
strides=(*arr.strides, arr.strides[-1]))
df = df.set_index(['device', 'datacenter'])[['temperature']]
with timed('rolling'):
print(
# df.groupby(['device', 'datacenter']).transform(
# lambda s: s.rolling(3).mean()
# ).dropna(),
# (da := DataArray(
# rolling(
# df['temperature'].values.reshape(len(df.index.levels[0]), len(df.index.levels[1]), -1),
# size=3,
# ),
# dims=[*df.index.names, 'time', 'win'],
# coords={k: v for k, v in zip(df.index.names, df.index.levels)},
# ))#.sel(datacenter='ABC0', device='oywj')
# (da := DataArray(
# rolling(
# df['temperature'].values.reshape(len(df.index.levels[0]), len(df.index.levels[1]), -1),
# size=3,
# ),
# dims=[*df.index.names, 'time', 'win'],
# coords={k: v for k, v in zip(df.index.names, df.index.levels)},
# )).mean(dim='win')
# df.groupby(['device', 'datacenter']).transform(
# lambda s: s.rolling(3).apply(lambda s: s.mean())
# ).dropna(),
)
from numpy.ma import masked_array
from numpy import arange, eye
from numpy.lib.stride_tricks import as_strided
def expanding(arr):
mask = eye(arr.shape[0]+1).cumsum(axis=1).astype(bool)[1:, :arr.shape[0]]
return masked_array(as_strided(arr, shape=(*arr.shape, arr.shape[0]),
strides=(0, *arr.strides)),
mask=mask)
xs = arange(10)
ys = expanding(xs)
print(
# xs,
ys,
# ys.sum(axis=1),
)
from data import df, timed
from data import big_df as df
from xarray import DataArray
from numpy.lib.stride_tricks import as_strided
from numpy.ma import masked_array
from numpy import arange, eye, tile
def expanding(arr):
mask = tile(
eye(arr.shape[-1]+1).cumsum(axis=1).astype(bool)[1:, :-1],
arr.shape[:-1],
).reshape(*arr.shape, arr.shape[-1])
return masked_array(as_strided(arr, shape=(*arr.shape, arr.shape[-1]),
strides=(*arr.strides[:-1], 0, arr.strides[-1])),
mask=mask)
df = df.set_index(['device', 'datacenter'])[['temperature']]
with timed('rolling'):
print(
# df.groupby(['device', 'datacenter']).transform(
# lambda s: s.expanding().agg(lambda s: s.mean())
# ).tail(),
# expanding(
# df['price'].values.reshape(len(df.index.levels[0]), len(df.index.levels[1]), -1),
# ).mean(axis=-1)
)