aboutsummaryrefslogtreecommitdiffstats
path: root/datamaps/core
diff options
context:
space:
mode:
Diffstat (limited to '')
-rw-r--r--datamaps/core/__init__.py3
-rw-r--r--datamaps/core/master.py208
-rw-r--r--datamaps/core/row.py84
-rw-r--r--datamaps/core/temporal.py121
4 files changed, 416 insertions, 0 deletions
diff --git a/datamaps/core/__init__.py b/datamaps/core/__init__.py
new file mode 100644
index 0000000..305208d
--- /dev/null
+++ b/datamaps/core/__init__.py
@@ -0,0 +1,3 @@
+from .row import Row
+from .temporal import Quarter, FinancialYear
+from .master import Master, ProjectData
diff --git a/datamaps/core/master.py b/datamaps/core/master.py
new file mode 100644
index 0000000..1fbfe90
--- /dev/null
+++ b/datamaps/core/master.py
@@ -0,0 +1,208 @@
+import re
+import datetime
+import logging
+import unicodedata
+from pathlib import Path
+from typing import List, Tuple, Iterable, Optional, Any
+
+from ..utils import project_data_from_master
+from ..process.cleansers import DATE_REGEX_4
+from .temporal import Quarter
+
+from openpyxl import load_workbook
+
+logger = logging.getLogger('bcompiler.utils')
+
+
+class ProjectData:
+ """
+ ProjectData class
+ """
+ def __init__(self, d: dict) -> None:
+ """
+ :py:func:`OrderedDict` is easiest to get from project_data_from_master[x]
+ """
+ self._data = d
+
+ def __len__(self) -> int:
+ return len(self._data)
+
+ def __getitem__(self, item):
+ return self._data[item]
+
+ def key_filter(self, key: str) -> List[Tuple]:
+ """
+ Return a list of (k, v) tuples if k in master key.
+ """
+ data = [item for item in self._data.items() if key in item[0]]
+ if not data:
+ raise KeyError("Sorry, there is no matching data")
+ return (data)
+
+ def pull_keys(self, input_iter: Iterable, flat=False) -> List[Tuple[Any, ...]]:
+ """
+ Returns a list of (key, value) tuples from ProjectData if key matches a
+ key. The order of tuples is based on the order of keys passed in the iterable.
+ """
+ if flat is True:
+ # search and replace troublesome EN DASH character
+ xs = [item for item in self._data.items()
+ for i in input_iter if item[0].strip().replace(unicodedata.lookup('EN DASH'), unicodedata.lookup('HYPHEN-MINUS')) == i]
+ xs = [_convert_str_date_to_object(x) for x in xs]
+ ts = sorted(xs, key=lambda x: input_iter.index(x[0].strip().replace(unicodedata.lookup('EN DASH'), unicodedata.lookup('HYPHEN-MINUS'))))
+ ts = [item[1] for item in ts]
+ return ts
+ else:
+ xs = [item for item in self._data.items()
+ for i in input_iter if item[0].replace(unicodedata.lookup('EN DASH'), unicodedata.lookup('HYPHEN-MINUS')) == i]
+ xs = [item for item in self._data.items()
+ for i in input_iter if item[0] == i]
+ xs = [_convert_str_date_to_object(x) for x in xs]
+ ts = sorted(xs, key=lambda x: input_iter.index(x[0].replace(unicodedata.lookup('EN DASH'), unicodedata.lookup('HYPHEN-MINUS'))))
+ return ts
+
+ def __repr__(self):
+ return f"ProjectData() - with data: {id(self._data)}"
+
+
+def _convert_str_date_to_object(d_str: tuple) -> Tuple[str, Optional[datetime.date]]:
+ try:
+ if re.match(DATE_REGEX_4, d_str[1]):
+ try:
+ ds = d_str[1].split('-')
+ return (d_str[0], datetime.date(int(ds[0]), int(ds[1]), int(ds[2])))
+ except TypeError:
+ return d_str
+ else:
+ return d_str
+ except TypeError:
+ return d_str
+
+
+class Master:
+ """A Master object, representing the main central data item in ``bcompiler``.
+
+ Args:
+ quarter (:py:class:`bcompiler.api.Quarter`): creating using ``Quarter(1, 2017)`` for example.
+ path (str): path to the master xlsx file
+
+ A master object is a composition between a :py:class:`bcompiler.api.Quarter` object and an
+ actual master xlsx file on disk.
+
+ You create one, either by creating the Quarter object first, and using that as the first
+ parameter of the ``Master`` constructor, e.g.::
+
+ from bcompiler.api import Quarter
+ from bcompiler.api import Master
+
+ q1 = Quarter(1, 2016)
+ m1 = Master(q1, '/tmp/master_1_2016.xlsx')
+
+ or by doing both in one::
+
+ m1 = Master(Quarter(1, 2016), '/tmp/master_1_2016.xlsx')
+
+ Once you have a ``Master`` object, you can access project data from it, like this::
+
+ project_data = m1['Project Title']
+
+
+ The following *attributes* are available on `m1` once created as such, e.g.::
+
+ data = m1.data
+ quarter = m1.quarter
+ filename = m1.filename
+ ..etc
+ """
+ def __init__(self, quarter: Quarter, path: str) -> None:
+ self._quarter = quarter
+ self.path = path
+ self._data = project_data_from_master(self.path)
+ self._project_titles = [item for item in self.data.keys()]
+ self.year = self._quarter.year
+
+ def __getitem__(self, project_name):
+ return ProjectData(self._data[project_name])
+
+ @property
+ def data(self):
+ """Return all the data contained in the master in a large, nested dictionary.
+
+ The resulting data structure contains a dictionary of :py:class:`colletions.OrderedDict` items whose
+ key is the name of a project::
+
+ "Project Name": OrderedDict("key": "value"
+ ...)
+
+ This object can then be further interrogated, for example to obtain all key/values
+ from a partictular project, by doing::
+
+ d = Master.data
+ project_data = d['PROJECT_NAME']
+
+ """
+ return self._data
+
+ @property
+ def quarter(self):
+ """Returns the ``Quarter`` object associated with the ``Master``.
+
+ Example::
+
+ q1 = m.quarter
+
+ ``q1`` can then be further interrogated as documented in :py:class:`core.temporal.Quarter`.
+
+ """
+
+ return self._quarter
+
+ @property
+ def filename(self):
+ """The filename of the master xlsx file, e.g. ``master_1_2017.xlsx``.
+ """
+ p = Path(self.path)
+ return p.name
+
+ @property
+ def projects(self):
+ """A list of project titles derived from the master xlsx.
+ """
+ return self._project_titles
+
+ def duplicate_keys(self, to_log=None):
+ """Checks for duplicate keys in a master xlsx file.
+
+ Args:
+ to_log (bool): Optional True or False, depending on whether you want to see duplicates reported in a ``WARNING`` log message. This is used mainly for internal purposes within ``bcompiler``.
+
+ Returns:
+ duplicates (set): a set of duplicated keys
+ """
+ wb = load_workbook(self.path)
+ ws = wb.active
+ col_a = next(ws.iter_cols())
+ col_a = [item.value for item in col_a]
+ seen: set = set()
+ uniq = []
+ dups: set = set()
+ for x in col_a:
+ if x not in seen:
+ uniq.append(x)
+ seen.add(x)
+ else:
+ dups.add(x)
+ if to_log and len(dups) > 0:
+ for x in dups:
+ logger.warning(f"{self.path} contains duplicate key: \"{x}\". Masters cannot contain duplicate keys. Rename them.")
+ return True
+ elif to_log and len(dups) == 0:
+ logger.info(f"No duplicate keys in {self.path}")
+ return False
+ elif len(dups) > 0:
+ return dups
+ else:
+ return False
+
+ def __repr__(self):
+ return f"Master({self.path}, {self.quarter.quarter}, {self.quarter.year})"
diff --git a/datamaps/core/row.py b/datamaps/core/row.py
new file mode 100644
index 0000000..470b753
--- /dev/null
+++ b/datamaps/core/row.py
@@ -0,0 +1,84 @@
+import string
+import datetime
+
+from ..process.cell import Cell
+from typing import Iterable
+
+from itertools import chain
+
+
+class Row:
+ """
+ A Row object is populated with an iterable (list or other sequence), bound
+ to an openpyxl worksheet. It is used to populate a row of cells in an output
+ Excel file with the values from the iterable.
+
+ The ``anchor_column`` and ``anchor_row`` parameters represent the coordinates of
+ a cell which form the *leftmost* cell of the row, i.e. to set the row of data
+ to start at the very top left of a sheet, you'd create the ``Row()`` object this::
+
+ r = Row(1, 1, interable)
+ r.bind(ws)
+ """
+
+ def __init__(self, anchor_column: int, anchor_row: int, seq: Iterable):
+ if isinstance(anchor_column, str):
+ if len(anchor_column) == 1:
+ enumerated_alphabet = list(enumerate(string.ascii_uppercase, start=1))
+ col_letter = [x for x in enumerated_alphabet if x[1] == anchor_column][0]
+ self._anchor_column = col_letter[0]
+ self._anchor_row = anchor_row
+ self._cell_map = []
+ elif len(anchor_column) == 2:
+ enumerated_alphabet = list(enumerate(list(chain(
+ string.ascii_uppercase, ["{}{}".format(x[0], x[1]) for x in list(zip(['A'] * 26, string.ascii_uppercase))])), start=1))
+ col_letter = [x for x in enumerated_alphabet if x[1] == anchor_column][0]
+ self._anchor_column = col_letter[0]
+ self._anchor_row = anchor_row
+ self._cell_map = []
+ else:
+ raise ValueError("You can only have a column up to AZ")
+ else:
+ self._anchor_column = anchor_column
+ self._anchor_row = anchor_row
+ self._cell_map = []
+ self._seq = seq
+
+
+ def _basic_bind(self, ws):
+ for x in list(enumerate(self._seq, start=self._anchor_column)):
+ self._ws.cell(row=self._anchor_row, column=x[0], value=x[1])
+
+
+ def _cell_bind(self, ws):
+ self._cell_map = []
+ for x in list(enumerate(self._seq, start=self._anchor_column)):
+ self._cell_map.append(
+ Cell(
+ cell_key="",
+ cell_value=x[1],
+ cell_reference=f"{self._anchor_column}{self._anchor_row}",
+ template_sheet=ws,
+ bg_colour=None,
+ fg_colour=None,
+ number_format=None,
+ verification_list=None,
+ r_idx=self._anchor_row,
+ c_idx=x[0]
+ )
+ )
+ for c in self._cell_map:
+ if not isinstance(c.cell_value, datetime.date) and not None:
+ self._ws.cell(row=c.r_idx, column=c.c_idx, value=c.cell_value).number_format = '0'
+ else:
+ self._ws.cell(row=c.r_idx, column=c.c_idx, value=c.cell_value)
+
+
+
+ def bind(self, worksheet):
+ """Bind the Row to a particular worksheetl, which effectively does the
+ printing of data into cells. Must be done prior to saving the workbook.
+ """
+ self._ws = worksheet
+# self._basic_bind(self._ws)
+ self._cell_bind(self._ws)
diff --git a/datamaps/core/temporal.py b/datamaps/core/temporal.py
new file mode 100644
index 0000000..ddb7004
--- /dev/null
+++ b/datamaps/core/temporal.py
@@ -0,0 +1,121 @@
+import datetime
+
+
+class FinancialYear:
+ """An object representing a financial year.
+
+ Used by ``bcompiler`` internally when creating :py:class:`bcompiler.api.Master` objects.
+ Can be used to calculate start and ends dates and :py:class:`bcompiler.api.Quarter` objects.
+
+ If parameter ``year`` must be in the range 150 - 2100.
+
+ """
+
+ def __init__(self, year):
+ if isinstance(year, int) and (year in range(150, 2100)):
+ self.year = year
+ else:
+ raise ValueError("A year must be an integer between 1950 and 2100")
+ self._generate_quarters()
+ self._q1 = self.quarters[0]
+ self._q2 = self.quarters[1]
+ self._q3 = self.quarters[2]
+ self._q4 = self.quarters[3]
+
+ self.start_date = self.q1.start_date
+ self.end_date = self.q4.end_date
+
+ @property
+ def q1(self) -> datetime.date:
+ """Quarter 1 as a :py:class:`datetime.date` object
+ """
+ return self._q1
+
+ @property
+ def q2(self):
+ """Quarter 2 as a :py:class:`datetime.date` object
+ """
+ return self._q2
+
+ @property
+ def q3(self):
+ """Quarter 3 as a :py:class:`datetime.date` object
+ """
+ return self._q3
+
+ @property
+ def q4(self):
+ """Quarter 4 as a :py:class:`datetime.date` object
+ """
+ return self._q4
+
+ def __str__(self):
+ return f"FY{str(self.year)}/{str(self.year + 1)[2:]}"
+
+ def _generate_quarters(self):
+ self.quarters = [Quarter(x, self.year) for x in range(1, 5)]
+
+
+ def __repr__(self):
+ return f"FinancialYear({self.year})"
+
+
+class Quarter:
+ """An object representing a financial quarter. This is mainly required for building
+ a :py:class:`core.master.Master` object.
+
+ Args:
+ quarter (int): e.g.1, 2, 3 or 4
+ year (int): e.g. 2013
+ """
+ _start_months = {
+ 1: (4, 'April'),
+ 2: (7, 'July'),
+ 3: (10, 'October'),
+ 4: (1, 'January')
+ }
+
+ _end_months = {
+ 1: (6, 'June', 30),
+ 2: (9, 'September', 30),
+ 3: (12, 'December', 31),
+ 4: (3, 'March', 31),
+ }
+
+
+ def __init__(self, quarter: int, year: int) -> None:
+
+ if isinstance(quarter, int) and (quarter >= 1 and quarter <= 4):
+ self.quarter = quarter
+ else:
+ raise ValueError("A quarter must be either 1, 2, 3 or 4")
+
+ if isinstance(year, int) and (year in range(1950, 2100)):
+ self.year = year
+ else:
+ raise ValueError("Year must be between 1950 and 2100 - surely that will do?")
+
+ self.start_date = self._start_date(self.quarter, self.year)
+ self.end_date = self._end_date(self.quarter, self.year)
+
+ def __str__(self):
+ return f"Q{self.quarter} {str(self.year)[2:]}/{str(self.year + 1)[2:]}"
+
+ def _start_date(self, q, y):
+ if q == 4:
+ y = y + 1
+ return datetime.date(y, Quarter._start_months[q][0], 1)
+
+ def _end_date(self, q, y):
+ if q == 4:
+ y = y + 1
+ return datetime.date(y, Quarter._end_months[q][0], Quarter._end_months[q][2])
+
+ def __repr__(self):
+ return f"Quarter({self.quarter}, {self.year})"
+
+ @property
+ def fy(self):
+ """Return a :py:class:`core.temporal.FinancialYear` object.
+ """
+ return FinancialYear(self.year)