seminars.fb

Data Analysis & Engineering → “pandas is really good, actually

Seminar (Wed, Nov 18, 2020; 12 PM PST)

Theme: Data Analysis & Engineering

Topic: pandas is really good, actually

Keywords: Python, data analysis, data engineering, numpy, pandas

Presenter James Powell james@dutc.io
Date Wednesday, November 18, 2020
Time 12:00 PM PST
print('Good morning!')
# now, look at some "traces": captured data from a
#    running system used to gain operational insight.

# we have a file of JSON-lines that encode 
#    captured metrics from some multi-node cluster,
#    including memory use, CPU load averages, &c.

# (note: for the purposes of this exercise, we will
#    use random data, so there's no value in reading
#    too much into the data itself)

from datetime import
traces = '''
{ "timestamp": "2020-07-04T09:00:00.000000", "node": "a", "mem": { "total": 1024, "used": 768 }, "swap": { "total": 32, "used": 8 }, "cpu": [ 0.9, 0.7, 0.6 ] }
{ "timestamp": "2020-07-04T09:00:30.000000", "node": "b", "mem": { "total": 1024, "used": 256 }, "swap": { "total": 32, "used": 0 }, "cpu": [ 0.1, 0.1, 0.1 ] }
{ "timestamp": "2020-07-04T09:01:00.000000", "node": "a", "mem": { "total": 1024, "used": 768 }, "swap": { "total": 32, "used": 8 }, "cpu": [ 0.2, 0.7, 0.6 ] }
'''

Data File

Traces file, containing JSON lines data with traces traces.jsonl

Exercise

# TASK: read in traces.jsonl as a stream and identify nodes with the
#       following potential operational issues
#       - 1-min load average >.8 for more than one hour
#       - swap usage >80% for more than five minutes
#       - memory free <10% for more than 2 minute two times in one hour

from collections import namedtuple
from json import loads
from datetime import datetime

Cpu  = namedtuple('Cpu', 'avg_1m avg_5m avg_10m')
Mem  = namedtuple('Mem', 'total used')
class Swap(namedtuple('Swap','total used')):
    @property
    def free(self):
        return self.total - self.used

class Trace(namedtuple('TraceBase', 'timestamp node cpu mem swap')):
    @classmethod
    def from_json(cls, json):
        timestamp = datetime.fromisoformat(json['timestamp'])
        node = json['node']
        cpu = Cpu(*json['cpu'])
        mem = Mem(**json['mem'])
        swap = Swap(**json['swap'])
        return cls(timestamp, node, cpu, mem, swap)

with open('traces.jsonl') as f:
    traces = [Trace.from_json(loads(line)) for line in f]

#  print(f'{traces[0].cpu.avg_1m = }')
from pandas import DataFrame, MultiIndex, concat
df = DataFrame(traces)
df.set_index('node', inplace=True)
df.set_index('timestamp', inplace=True, append=True)
df.sort_index(inplace=True)

for idx, col in enumerate(['1T', '5T', '10T']):
    df['cpu', col] = df['cpu'].str[idx]
del df['cpu']

for stat in {'mem', 'swap'}:
    for idx, col in enumerate(['total', 'usage']):
        df[stat, col] = df[stat].str[idx]
    df[stat, 'free'] = df[stat, 'total'] - df[stat, 'usage']
    df[stat, 'pct']  = df[stat, 'usage'] / df[stat, 'total']
    del df[stat]

df.columns = MultiIndex.from_tuples(df.columns)

df['warning', 'cpu']  = df['cpu',   '1T'] > 0.80
df['warning', 'swap'] = df['swap', 'pct'] > 0.80

per_node = (df.loc[n].resample('1T').first().reset_index().assign(node=n)
            for n in df.index.get_level_values(0).unique())
df = concat(per_node)
df.set_index(['node', 'timestamp'], inplace=True)
df.sort_index(inplace=True)

for col in (w := df['warning'].rolling(60, min_periods=1).sum() == 60).columns:
    df['flag', col] = w[col]

from collections import namedtuple
class Flagged(namedtuple('Flagged', 'df cpu swap')):
    @classmethod
    def from_df(cls, df):
        cpu = df['flag', 'cpu'].groupby(level=0).any()
        swap = df['flag', 'swap'].groupby(level=0).any()
        return cls(df, cpu, swap)

flagged = Flagged.from_df(df)
for node in flagged.cpu[flagged.cpu].index:
    print(flagged.df[flagged.df['flag', 'cpu']].loc[node][['cpu', 'flag']].head())

#  print(df.loc['a'].sample(3)[['cpu', 'warning', 'flag']])
#  print(df.loc['a'].sample(3)[['cpu', 'warning']])