Source code for xlref.parser

#!/usr/bin/env python
# -*- coding: UTF-8 -*-
#
# Copyright 2020-2024 Vincenzo Arcidiacono;
# Licensed under the EUPL (the 'Licence');
# You may not use this work except in compliance with the Licence.
# You may obtain a copy of the Licence at: http://ec.europa.eu/idabc/eupl
"""
It provides xlparser reference parser class.
"""
import io
import re
import string
import logging
import numpy as np
import os.path as osp
from .filters import FILTERS
from .errors import InvalidSyntax, InvalidReference, NoFullCell

log = logging.getLogger(__name__)

_primitive_dir = dict(zip(
    'LURD', np.array([[0, -1], [-1, 0], [0, 1], [1, 0]], int)
))

# noinspection RegExpRedundantEscape
_re_xl_ref_parser = re.compile(
    r"""
    ^\s*(?:(?P<file>[^!#]+)?)?\s*\#\s*                      # xl file name
    (?:(?P<sheet>[^!]+)?!)?\s*                              # xl sheet name
    (?:                                                     # first cell
        (?P<st_col>[A-Z]+|_|\^)\s*                          # first col
        (?P<st_row>\d+|_|\^)\s*                             # first row
        (?:\(\s*
            (?P<st_mov>L|U|R|D|LD|LU|UL|UR|RU|RD|DL|DR)\s*  # moves from st cell
            \)
        )?
    )\s*
    (?::\s*                                                 # second cell [opt]
        (?P<nd_col>[A-Z]+|_|\^|\.)\s*                       # second col
        (?P<nd_row>\d+|_|\^|\.)\s*                          # second row
        (?:\(\s*
            (?P<nd_mov>L|U|R|D|LD|LU|UL|UR|RU|RD|DL|DR)\s*  # moves from nd cell
            \)
        )?
    )?\s*
    (?::\s*
            (?P<range_exp>[LURD]+)                          # expansion [opt]
    )?\s*
    (?:
        \s*(?P<filters>[\[{].*[\]}])\s*                     # filters [opt]
    )?\s*$""", re.IGNORECASE | re.X)


def _row2num(coord):
    if coord in '^_.':
        return coord
    return int(coord) - 1


def _col2num(coord):
    if coord in '^_.':
        return coord
    num = 0
    for c in coord.upper():
        num = num * 26 + string.ascii_uppercase.rindex(c) + 1
    return num - 1


def _num2col(num):
    d = num // 26
    chr1 = _num2col(d) if d > 0 else ''
    return '%s%s' % (chr1, chr(ord('A') + num % 26))


[docs] class Range:
[docs] def __init__(self, st_cell, nd_cell): (self.r0, self.c0), (self.r1, self.c1) = st_cell, nd_cell
[docs] def get(self): return self.r0, self.c0, self.r1, self.c1
def __repr__(self): return '%s%d:%s%d' % ( _num2col(self.c0), self.r0 + 1, _num2col(self.c1), self.r1 + 1 )
# noinspection PyTypeChecker
[docs] class Ref: """Reference parser""" _curr_dir = '.' _engines = { 'xlsx': 'openpyxl', 'xlsxm': 'openpyxl', 'xls': 'xlrd', 'xlsm': 'openpyxl', 'odf': 'odf', 'ods': 'odf', 'odt': 'odf', 'csv': 'none', 'xlsb': 'pyxlsb', None: 'openpyxl' } _re = _re_xl_ref_parser _open_sheet_kw = {'header': None} _open = open def _match(self, ref): m = self._re.match(ref) if not m: raise InvalidSyntax(ref) return m.groupdict()
[docs] def __init__(self, ref, parent=None, cache=None): try: d = self._match(ref) p = d.pop d['st_ref'] = self._ref(p('st_col'), p('st_row'), p('st_mov')) d['nd_ref'] = self._ref(p('nd_col'), p('nd_row'), p('nd_mov')) d['filters'] = self._parse_filters(d['filters'] or '[]') self.ref = d self.parent = parent self.cache = {} if cache is None else cache except InvalidSyntax as ex: raise ex except Exception as ex: raise InvalidReference(ref, ex)
@staticmethod def _parse_filters(s): from json import loads v = loads(s) return [v] if isinstance(v, dict) else v @staticmethod def _ref(cell_col, cell_row, cell_mov): if cell_col == cell_row == cell_mov is None: return None row = _row2num(cell_row) col = _col2num(cell_col) mov = cell_mov.upper() if cell_mov else None return (row, col), mov def _open_workbook(self, fpath): from pandas import ExcelFile ext = osp.splitext(fpath.lower())[1][1:] engine = self._engines.get(ext, self._engines[None]) if engine == 'none': ExcelFile._engines['none'] = lambda *args, **kwargs: None wb = ExcelFile(io.BytesIO(), engine=engine) wb.sheet_indices = {'sheet1': 0} else: with self._open(fpath, 'rb') as f: wb = ExcelFile(io.BytesIO(f.read()), engine=engine) wb.sheet_indices = { k.lower(): i for i, k in enumerate(wb.sheet_names) } if ext in ('csv',): import pandas as pd sn = {j: i for i, j in wb.sheet_indices.items()}[0] with self._open(fpath, 'rb') as f: self.cache[(wb, sn)] = pd.read_csv(f, header=None).values return wb def _open_sheet(self, workbook, name): name = getattr(workbook, 'sheet_indices', {}).get(name, name) return workbook.parse(name, **self._open_sheet_kw).values @property def book(self): if 'xl_book' not in self.ref: fp = self.ref['file'] if fp: curr_dir = self._curr_dir if self.parent: curr_dir = osp.dirname(self.parent.ref['fpath']) self.ref['fpath'] = fp = osp.abspath(osp.join(curr_dir, fp)) if fp in self.cache: wb = self.cache[fp] else: self.cache[fp] = wb = self._open_workbook(fp) else: self.ref['fpath'] = self.parent.ref.get('fpath') wb = self.parent.book self.ref['xl_book'] = wb return self.ref['xl_book'] @property def sheet(self): if 'xl_sheet' not in self.ref: sn = self.ref['sheet'] if not sn and not self.parent: sn = {j: i for i, j in self.book.sheet_indices.items()}[0] if sn: wb, sn = self.book, sn.lower() if (wb, sn) in self.cache: sheet = self.cache[(wb, sn)] else: self.cache[(wb, sn)] = sheet = self._open_sheet(wb, sn) else: sheet = self.parent.sheet self.ref['xl_sheet'] = sheet return self.ref['xl_sheet'] @property def full_cells(self): if 'full_cells' not in self.ref: from pandas import isnull self.ref['full_cells'] = ~isnull(self.sheet) return self.ref['full_cells'] @property def margins(self): if 'margins' not in self.ref: indices = np.array(np.where(self.full_cells)).T if indices.shape[0]: up_r, up_c = indices.min(0) dn_r, dn_c = indices.max(0) else: up_r = up_c = dn_r = dn_c = 0 m = {'^': up_r, '_': dn_r}, {'^': up_c, '_': dn_c} self.ref['margins'] = m return self.ref['margins'] def _target_full(self, cell, moves): up, dn = (0, 0), (self.margins[0]['_'], self.margins[1]['_']) mv = _primitive_dir[moves[0]] # first move c0, full_cells = np.array(cell), self.full_cells if 'U' in moves and not c0[0] <= dn[0]: c0[0] = dn[0] if 'L' in moves and not c0[1] <= dn[1]: c0[1] = dn[1] while (up <= c0).all() and (c0 <= dn).all(): c1 = c0.copy() while (up <= c1).all(): try: if full_cells[c1[0], c1[1]]: return c1 except IndexError: break c1 += mv try: c0 += _primitive_dir[moves[1]] # second move except IndexError: break raise NoFullCell(cell, moves) def _resolve_ref(self, ref, pcell=None): row, col = ref[0] if row in ('^', '_'): row = self.margins[0][row] elif row == '.': row = pcell[0] if col in ('^', '_'): col = self.margins[1][col] elif col == '.': col = pcell[1] try: state = self.full_cells[row, col] except IndexError: state = False if not (state or ref[1] is None): row, col = self._target_full((row, col), ref[1]) return row, col def _expand_range(self, st, nd, range_exp): range_exp, exp = range_exp.upper(), np.zeros((2, 2), int) for k, i in (('L', 0), ('U', 0), ('R', 1), ('D', 1)): exp[:, i] += _primitive_dir[k] * range_exp.count(k) # noinspection PyUnresolvedReferences empty, r0 = ~self.full_cells, np.array((st, nd), int) rng = r0.T + [0, 1] b = np.array([all(empty[slice(*rng[0]), slice(*rng[1])].shape)]) margins = np.array([(m['^'], m['_'] + 1) for m in self.margins]).T while b.any(): empty[slice(*rng[0]), slice(*rng[1])] = True r = np.clip(rng + exp, margins[0, :, None], margins[1, :, None]) b = ~np.array([ empty[r[0] - [0, 1], slice(*r[1])].all(1), empty[slice(*r[0]), r[1] - [0, 1]].all(0) ]) rng = np.where(b, r, rng) r = (rng - [0, 1]).T return tuple(np.minimum(r[0], r0[0])), tuple(np.maximum(r[1], r0[1])) @property def range(self): if 'rect' not in self.ref: nd = st = self._resolve_ref(self.ref['st_ref']) nd_ref, range_exp = self.ref['nd_ref'], self.ref['range_exp'] if nd_ref is not None: nd = self._resolve_ref(nd_ref, st) r, c = (st[0], nd[0]), (st[1], nd[1]) st, nd = (min(r), min(c)), (max(r), max(c)) if range_exp is not None: st, nd = self._expand_range(st, nd, range_exp) self.ref['rect'] = Range(st, nd) return self.ref['rect'] @property def values(self): if 'values' not in self.ref: r0, c0, r1, c1 = self.range.get() v = self.sheet[r0:r1 + 1, c0:c1 + 1] shape, s = (r1 - r0 + 1, c1 - c0 + 1), tuple(v.shape) if s != shape: # Add empty values. c = np.full(shape, np.nan, object) c[:s[0], :s[1]] = v v = c self.ref['values'] = compile_filters(self.ref['filters'], self)(v) return self.ref['values']
[docs] def compile_filters(filters, parent): it = (dict(k) if isinstance(k, dict) else {'fun': k} for k in filters) it = [(v.pop('fun'), v.get('args', ()), v.get('kw', v)) for v in it] def call_filters(value): for k, args, kw in it: value = FILTERS[k](parent, value, *args, **kw) return value return call_filters