seminars.fb

#!/usr/bin/env python3
"""
This file is used to connect to ODS and query data

Usage:
async with Ods.connect() as ods:
    entity = ["fsw005.p071.f01.ftw5"]
    keys = ["sensors.util.PSU1_IN_POWER"]
    start = (date.today() - timedelta(days=1)).timestamp()
    query = ods.SimpleQuery(entity, keys, start)
    results = await ods.submit(query)
"""

import datetime
import sys
from collections import namedtuple
from logging import getLogger
from typing import Any, List, NamedTuple

from hw_eng.hw_insights.core.hwi_clients import get_rapido_client
from hw_eng.hw_insights.core.hwi_helpers import time_ago
from IPython import embed
from pandas import DataFrame, MultiIndex, Series, concat, to_datetime
from rapido.asyncio.Rapido.ttypes import RapidoQueryParams


sys.breakpointhook = embed

# pyre-ignore-all-errors


logger = getLogger(_name_)


class Ods:
    async def __aenter__(self):
        logger.info("Connecting to ODS")
        return self

    async def __aexit__(self, exc, exc_type, tb):
        logger.info("Disconnecting from ODS")

    @classmethod
    def connect(cls):
        """
        This functions returns the class
        """
        return cls()

    async def submit(self, query: "SimpleQuery", quantiles: List = None) -> Any:
        """
        This function runs the ods query using the rapido client
        Args:
        query - This is of type Simple Query with fields needed for ods
        Returns:
        rv - returns the object returned by rapido client. #TODO:return dataframe
        """
        with await get_rapido_client() as client:
            rv = await query.run_with_client(client, quantiles=quantiles)
        return rv

    class RawQuery(
        namedtuple("SimpleQueryBase", "entities keys start end transform reduce")
    ):
        def __new__(
            cls,
            entities: List,
            keys: List,
            start: datetime,
            end: datetime = None,
            *,
            transform: List = [],
            reduce: List = [],
        ) -> NamedTuple:
            """
            This function accepts the fields needed for ods query and returns a tuple
            Args:
            entities - List of entities (switches) to be queried
            keys - List of ods keys for the entities to be queried
            start - Timestamp for start of data being queried
            end - Timestamp for end of data being queried
            transform - List of transforms applied to individual entity data
            reduce - List of reductions applied to set of timeseries data in query
            Returns:
            Named tuple of the passed args
            """
            return super().__new__(cls, entities, keys, start, end, transform, reduce)

        def batch(self, maxsize=5_000_000):
            # assumption: uniform fidelity
            #           - <7 days, 96 values per key; >7, 24 values key
            # assumption: uniform across entities
            #           - (fixable?) some entities will have zero values
            from itertools import groupby
            from string import ascii_lowercase
            from random import choice
            days = (end - start).days
            size = maxsize // len(keys) // days // 96
            entities = [''.join(choice(ascii_lowercase) for _ in range(5)) for _ in range(22)]
            groups = ([e for _, e in ents] for _, ents in groupby(enumerate(self.entities), lambda idx_ent: idx_ent[0] // size))
            for g in groups:
                print( type(self)(g, self.keys, self.start, self.end )
            breakpoint()

        @property
        def as_rapido(self) -> RapidoQueryParams:
            """
            This function return the query object after restructuring
            the parameters in a manner needed for ods client
            Returns:
            query object passes to ods(rapido) client
            """
            query = RapidoQueryParams()
            query.entity_desc = ",".join(self.entities)
            query.key_desc = ",".join(self.keys)
            query.transform_desc = self.transform
            query.reduce_desc = ",".join(self.reduce)
            query.start_time = time_ago(to_datetime(self.start).timestamp())
            query.end_time = time_ago(to_datetime(self.end).timestamp())
            return query

        async def run_with_client(
            self, client: get_rapido_client, *args, **kwargs
        ) -> Any:
            """
            This function connects to the ods client and returns the formatted response
            Args:
            client - ODS client object
            Returns:
            rv - returned data from the ODS client.
            """
            rv = [await client.queryNumeric({"0": q.as_rapido}) for q in 
                  query.batched(maxsize=10000)]
            return rv

    class SimpleQuery(RawQuery):
        async def run_with_client(
            self, client: get_rapido_client, quantiles: List, *args, **kwargs
        ) -> DataFrame:
            """
            This function connects to the ods client and returns the response as dataframe
            Args:
            client - ODS client object
            Returns:
            df - dataframe with queried ods data
            """
            ods_results = await super().run_with_client(client, *args, **kwargs)
            df_results = []
            for _, query_result in ods_results.items():
                for entity, results in query_result.items():
                    values = [Series(data, name=key) for key, data in results.items()]
                    for v in values:
                        v.index = to_datetime(v.index, unit="s")
                        v.index = MultiIndex.from_arrays(
                            [[entity] * len(v), v.index],
                            names=["ods_entity", "timestamp"],
                        )
                    df_results.append(concat(values, axis=1))
            df = concat(df_results)
            df.index = df.index.reorder_levels([1, 0])
            df.index, *_ = df.index.sortlevel(0)
            df.columns = [
                column.replace(":", "").replace(".", "").replace("-", "_")
                for column in df.columns
            ]
            if not quantiles:
                return df
            proc_df = df.groupby("ods_entity").quantile(quantiles).unstack()
            proc_df.columns = [
                f"{column[0]}_{column[1]*100:.0f}" for column in proc_df.columns
            ]
            proc_df["timestamp"] = self.end
            return df, proc_df.reset_index().set_index(["timestamp", "ods_entity"])

if __name__ == '__main__':
    data = get_from_hive("dc_graph_devices", partition=query_date)
        
    entities = list(data[data["model_obj_model"]=="MINIPACK_CHASSIS_BUNDLE"]["hostname"])
    keys = ["sensors.util.SCM_INLET_REMOTE_TEMP"]
    start = query_date - timedelta(days=1)
    end = query_date
    quantiles = [0.01, 0.50, 0.90, 0.95, 0.99]
    async with Ods.connect() as ods:
        query = ods.SimpleQuery(entities, keys, start, end)
        rich_df, proc_df = await ods.submit(query, quantiles = quantiles)
# batch=
True  # always batch?
False # 

def submit(query):
    if batch:
        subqueries = self.batch(query)
    ...


# assumption…
#  → always follow the assumption
#  → never follow the assumption
#     → do the thing
#     → don't the thing
from collections import namedtuple
class Query(namedtuple('Query', 'a b c')):
    @property
    def batched(self):
        for ... in ...:
            yield type(self)(batch, self.b, self.c