pandas
is really good, actually”Theme: Data Analysis & Engineering
Topic: pandas
is really good, actually
Keywords: Python, data analysis, data engineering, numpy, pandas
Presenter | James Powell james@dutc.io |
Date | Wednesday, November 18, 2020 |
Time | 12:00 PM PST |
print('Good morning!')
# now, look at some "traces": captured data from a
# running system used to gain operational insight.
# we have a file of JSON-lines that encode
# captured metrics from some multi-node cluster,
# including memory use, CPU load averages, &c.
# (note: for the purposes of this exercise, we will
# use random data, so there's no value in reading
# too much into the data itself)
from datetime import
traces = '''
{ "timestamp": "2020-07-04T09:00:00.000000", "node": "a", "mem": { "total": 1024, "used": 768 }, "swap": { "total": 32, "used": 8 }, "cpu": [ 0.9, 0.7, 0.6 ] }
{ "timestamp": "2020-07-04T09:00:30.000000", "node": "b", "mem": { "total": 1024, "used": 256 }, "swap": { "total": 32, "used": 0 }, "cpu": [ 0.1, 0.1, 0.1 ] }
{ "timestamp": "2020-07-04T09:01:00.000000", "node": "a", "mem": { "total": 1024, "used": 768 }, "swap": { "total": 32, "used": 8 }, "cpu": [ 0.2, 0.7, 0.6 ] }
'''
Traces file, containing JSON lines data with traces traces.jsonl
object
sobject
has fields timestamp
, node
, mem
, swap
, cpu
timestamp
: datetime.isoformat()
datetimecpu
: string
node namemem
: object
with fields total
and used
with memory usageswap
: object
with fields total
and used
with swap usagecpu
: array
with four number
s (1-min, 5-min, 10-min load average)# TASK: read in traces.jsonl as a stream and identify nodes with the
# following potential operational issues
# - 1-min load average >.8 for more than one hour
# - swap usage >80% for more than five minutes
# - memory free <10% for more than 2 minute two times in one hour
from collections import namedtuple
from json import loads
from datetime import datetime
Cpu = namedtuple('Cpu', 'avg_1m avg_5m avg_10m')
Mem = namedtuple('Mem', 'total used')
class Swap(namedtuple('Swap','total used')):
@property
def free(self):
return self.total - self.used
class Trace(namedtuple('TraceBase', 'timestamp node cpu mem swap')):
@classmethod
def from_json(cls, json):
timestamp = datetime.fromisoformat(json['timestamp'])
node = json['node']
cpu = Cpu(*json['cpu'])
mem = Mem(**json['mem'])
swap = Swap(**json['swap'])
return cls(timestamp, node, cpu, mem, swap)
with open('traces.jsonl') as f:
traces = [Trace.from_json(loads(line)) for line in f]
# print(f'{traces[0].cpu.avg_1m = }')
from pandas import DataFrame, MultiIndex, concat
df = DataFrame(traces)
df.set_index('node', inplace=True)
df.set_index('timestamp', inplace=True, append=True)
df.sort_index(inplace=True)
for idx, col in enumerate(['1T', '5T', '10T']):
df['cpu', col] = df['cpu'].str[idx]
del df['cpu']
for stat in {'mem', 'swap'}:
for idx, col in enumerate(['total', 'usage']):
df[stat, col] = df[stat].str[idx]
df[stat, 'free'] = df[stat, 'total'] - df[stat, 'usage']
df[stat, 'pct'] = df[stat, 'usage'] / df[stat, 'total']
del df[stat]
df.columns = MultiIndex.from_tuples(df.columns)
df['warning', 'cpu'] = df['cpu', '1T'] > 0.80
df['warning', 'swap'] = df['swap', 'pct'] > 0.80
per_node = (df.loc[n].resample('1T').first().reset_index().assign(node=n)
for n in df.index.get_level_values(0).unique())
df = concat(per_node)
df.set_index(['node', 'timestamp'], inplace=True)
df.sort_index(inplace=True)
for col in (w := df['warning'].rolling(60, min_periods=1).sum() == 60).columns:
df['flag', col] = w[col]
from collections import namedtuple
class Flagged(namedtuple('Flagged', 'df cpu swap')):
@classmethod
def from_df(cls, df):
cpu = df['flag', 'cpu'].groupby(level=0).any()
swap = df['flag', 'swap'].groupby(level=0).any()
return cls(df, cpu, swap)
flagged = Flagged.from_df(df)
for node in flagged.cpu[flagged.cpu].index:
print(flagged.df[flagged.df['flag', 'cpu']].loc[node][['cpu', 'flag']].head())
# print(df.loc['a'].sample(3)[['cpu', 'warning', 'flag']])
# print(df.loc['a'].sample(3)[['cpu', 'warning']])