diff options
Diffstat (limited to '')
-rw-r--r-- | datamaps/plugins/dft/master.py | 111 |
1 files changed, 89 insertions, 22 deletions
diff --git a/datamaps/plugins/dft/master.py b/datamaps/plugins/dft/master.py index 744cc27..324b63d 100644 --- a/datamaps/plugins/dft/master.py +++ b/datamaps/plugins/dft/master.py @@ -1,23 +1,23 @@ -import re import datetime import logging +import re import unicodedata from pathlib import Path -from typing import List, Tuple, Iterable, Optional, Any +from typing import Any, Iterable, List, Optional, Tuple +from datamaps.core.temporal import Quarter from datamaps.plugins.dft.portfolio import project_data_from_master from datamaps.process.cleansers import DATE_REGEX_4 -from datamaps.core.temporal import Quarter - from openpyxl import load_workbook -logger = logging.getLogger('bcompiler.utils') +logger = logging.getLogger("bcompiler.utils") class ProjectData: """ ProjectData class """ + def __init__(self, d: dict) -> None: """ :py:func:`OrderedDict` is easiest to get from project_data_from_master[x] @@ -37,7 +37,7 @@ class ProjectData: data = [item for item in self._data.items() if key in item[0]] if not data: raise KeyError("Sorry, there is no matching data") - return (data) + return data def pull_keys(self, input_iter: Iterable, flat=False) -> List[Tuple[Any, ...]]: """ @@ -46,19 +46,54 @@ class ProjectData: """ if flat is True: # search and replace troublesome EN DASH character - xs = [item for item in self._data.items() - for i in input_iter if item[0].strip().replace(unicodedata.lookup('EN DASH'), unicodedata.lookup('HYPHEN-MINUS')) == i] + xs = [ + item + for item in self._data.items() + for i in input_iter + if item[0] + .strip() + .replace( + unicodedata.lookup("EN DASH"), unicodedata.lookup("HYPHEN-MINUS") + ) + == i + ] xs = [_convert_str_date_to_object(x) for x in xs] - ts = sorted(xs, key=lambda x: input_iter.index(x[0].strip().replace(unicodedata.lookup('EN DASH'), unicodedata.lookup('HYPHEN-MINUS')))) + ts = sorted( + xs, + key=lambda x: input_iter.index( + x[0] + .strip() + .replace( + unicodedata.lookup("EN DASH"), + unicodedata.lookup("HYPHEN-MINUS"), + ) + ), + ) ts = [item[1] for item in ts] return ts else: - xs = [item for item in self._data.items() - for i in input_iter if item[0].replace(unicodedata.lookup('EN DASH'), unicodedata.lookup('HYPHEN-MINUS')) == i] - xs = [item for item in self._data.items() - for i in input_iter if item[0] == i] + xs = [ + item + for item in self._data.items() + for i in input_iter + if item[0].replace( + unicodedata.lookup("EN DASH"), unicodedata.lookup("HYPHEN-MINUS") + ) + == i + ] + xs = [ + item for item in self._data.items() for i in input_iter if item[0] == i + ] xs = [_convert_str_date_to_object(x) for x in xs] - ts = sorted(xs, key=lambda x: input_iter.index(x[0].replace(unicodedata.lookup('EN DASH'), unicodedata.lookup('HYPHEN-MINUS')))) + ts = sorted( + xs, + key=lambda x: input_iter.index( + x[0].replace( + unicodedata.lookup("EN DASH"), + unicodedata.lookup("HYPHEN-MINUS"), + ) + ), + ) return ts def __repr__(self): @@ -69,7 +104,7 @@ def _convert_str_date_to_object(d_str: tuple) -> Tuple[str, Optional[datetime.da try: if re.match(DATE_REGEX_4, d_str[1]): try: - ds = d_str[1].split('-') + ds = d_str[1].split("-") return (d_str[0], datetime.date(int(ds[0]), int(ds[1]), int(ds[2]))) except TypeError: return d_str @@ -114,12 +149,21 @@ class Master: filename = m1.filename ..etc """ - def __init__(self, quarter: Quarter, path: str) -> None: + + def __init__( + self, quarter: Quarter, path: str, declared_month: Optional[int] = None + ) -> None: self._quarter = quarter + self._declared_month = declared_month self.path = path + if self._declared_month: + idxs = [x.month_int for x in self._quarter.months] + m_idx = idxs.index(self._declared_month) + self.year = self._quarter.months[m_idx].year + else: + self.year = self._quarter.year self._data = project_data_from_master(self.path) self._project_titles = [item for item in self.data.keys()] - self.year = self._quarter.year def __getitem__(self, project_name): return ProjectData(self._data[project_name]) @@ -158,16 +202,37 @@ class Master: return self._quarter @property - def filename(self): - """The filename of the master xlsx file, e.g. ``master_1_2017.xlsx``. + def month(self): """ + Returns the ``Month`` object associated with the ``Master``. + """ + months = { + 1: "January", + 2: "February", + 3: "March", + 4: "April", + 5: "May", + 6: "June", + 7: "July", + 8: "August", + 9: "September", + 10: "October", + 11: "November", + 12: "December", + } + return [ + m for m in self.quarter.months if m.name == months[self._declared_month] + ][0] + + @property + def filename(self): + """The filename of the master xlsx file, e.g. ``master_1_2017.xlsx``.""" p = Path(self.path) return p.name @property def projects(self): - """A list of project titles derived from the master xlsx. - """ + """A list of project titles derived from the master xlsx.""" return self._project_titles def duplicate_keys(self, to_log=None): @@ -194,7 +259,9 @@ class Master: dups.add(x) if to_log and len(dups) > 0: for x in dups: - logger.warning(f"{self.path} contains duplicate key: \"{x}\". Masters cannot contain duplicate keys. Rename them.") + logger.warning( + f'{self.path} contains duplicate key: "{x}". Masters cannot contain duplicate keys. Rename them.' + ) return True elif to_log and len(dups) == 0: logger.info(f"No duplicate keys in {self.path}") |