#!/usr/bin/env python3
"""
This file is used to connect to ODS and query data
Usage:
async with Ods.connect() as ods:
entity = ["fsw005.p071.f01.ftw5"]
keys = ["sensors.util.PSU1_IN_POWER"]
start = (date.today() - timedelta(days=1)).timestamp()
query = ods.SimpleQuery(entity, keys, start)
results = await ods.submit(query)
"""
import datetime
import sys
from collections import namedtuple
from logging import getLogger
from typing import Any, List, NamedTuple
from hw_eng.hw_insights.core.hwi_clients import get_rapido_client
from hw_eng.hw_insights.core.hwi_helpers import time_ago
from IPython import embed
from pandas import DataFrame, MultiIndex, Series, concat, to_datetime
from rapido.asyncio.Rapido.ttypes import RapidoQueryParams
sys.breakpointhook = embed
# pyre-ignore-all-errors
logger = getLogger(_name_)
class Ods:
async def __aenter__(self):
logger.info("Connecting to ODS")
return self
async def __aexit__(self, exc, exc_type, tb):
logger.info("Disconnecting from ODS")
@classmethod
def connect(cls):
"""
This functions returns the class
"""
return cls()
async def submit(self, query: "SimpleQuery", quantiles: List = None) -> Any:
"""
This function runs the ods query using the rapido client
Args:
query - This is of type Simple Query with fields needed for ods
Returns:
rv - returns the object returned by rapido client. #TODO:return dataframe
"""
with await get_rapido_client() as client:
rv = await query.run_with_client(client, quantiles=quantiles)
return rv
class RawQuery(
namedtuple("SimpleQueryBase", "entities keys start end transform reduce")
):
def __new__(
cls,
entities: List,
keys: List,
start: datetime,
end: datetime = None,
*,
transform: List = [],
reduce: List = [],
) -> NamedTuple:
"""
This function accepts the fields needed for ods query and returns a tuple
Args:
entities - List of entities (switches) to be queried
keys - List of ods keys for the entities to be queried
start - Timestamp for start of data being queried
end - Timestamp for end of data being queried
transform - List of transforms applied to individual entity data
reduce - List of reductions applied to set of timeseries data in query
Returns:
Named tuple of the passed args
"""
return super().__new__(cls, entities, keys, start, end, transform, reduce)
def batch(self, maxsize=5_000_000):
# assumption: uniform fidelity
# - <7 days, 96 values per key; >7, 24 values key
# assumption: uniform across entities
# - (fixable?) some entities will have zero values
from itertools import groupby
from string import ascii_lowercase
from random import choice
days = (end - start).days
size = maxsize // len(keys) // days // 96
entities = [''.join(choice(ascii_lowercase) for _ in range(5)) for _ in range(22)]
groups = ([e for _, e in ents] for _, ents in groupby(enumerate(self.entities), lambda idx_ent: idx_ent[0] // size))
for g in groups:
print( type(self)(g, self.keys, self.start, self.end )
breakpoint()
@property
def as_rapido(self) -> RapidoQueryParams:
"""
This function return the query object after restructuring
the parameters in a manner needed for ods client
Returns:
query object passes to ods(rapido) client
"""
query = RapidoQueryParams()
query.entity_desc = ",".join(self.entities)
query.key_desc = ",".join(self.keys)
query.transform_desc = self.transform
query.reduce_desc = ",".join(self.reduce)
query.start_time = time_ago(to_datetime(self.start).timestamp())
query.end_time = time_ago(to_datetime(self.end).timestamp())
return query
async def run_with_client(
self, client: get_rapido_client, *args, **kwargs
) -> Any:
"""
This function connects to the ods client and returns the formatted response
Args:
client - ODS client object
Returns:
rv - returned data from the ODS client.
"""
rv = [await client.queryNumeric({"0": q.as_rapido}) for q in
query.batched(maxsize=10000)]
return rv
class SimpleQuery(RawQuery):
async def run_with_client(
self, client: get_rapido_client, quantiles: List, *args, **kwargs
) -> DataFrame:
"""
This function connects to the ods client and returns the response as dataframe
Args:
client - ODS client object
Returns:
df - dataframe with queried ods data
"""
ods_results = await super().run_with_client(client, *args, **kwargs)
df_results = []
for _, query_result in ods_results.items():
for entity, results in query_result.items():
values = [Series(data, name=key) for key, data in results.items()]
for v in values:
v.index = to_datetime(v.index, unit="s")
v.index = MultiIndex.from_arrays(
[[entity] * len(v), v.index],
names=["ods_entity", "timestamp"],
)
df_results.append(concat(values, axis=1))
df = concat(df_results)
df.index = df.index.reorder_levels([1, 0])
df.index, *_ = df.index.sortlevel(0)
df.columns = [
column.replace(":", "").replace(".", "").replace("-", "_")
for column in df.columns
]
if not quantiles:
return df
proc_df = df.groupby("ods_entity").quantile(quantiles).unstack()
proc_df.columns = [
f"{column[0]}_{column[1]*100:.0f}" for column in proc_df.columns
]
proc_df["timestamp"] = self.end
return df, proc_df.reset_index().set_index(["timestamp", "ods_entity"])
if __name__ == '__main__':
data = get_from_hive("dc_graph_devices", partition=query_date)
entities = list(data[data["model_obj_model"]=="MINIPACK_CHASSIS_BUNDLE"]["hostname"])
keys = ["sensors.util.SCM_INLET_REMOTE_TEMP"]
start = query_date - timedelta(days=1)
end = query_date
quantiles = [0.01, 0.50, 0.90, 0.95, 0.99]
async with Ods.connect() as ods:
query = ods.SimpleQuery(entities, keys, start, end)
rich_df, proc_df = await ods.submit(query, quantiles = quantiles)