Source code for eezz.http_agent

# -*- coding: utf-8 -*-
"""
 his module implements the following classes

    * :py:class:`eezz.http_agent.THttpAgent`:  The handler for incoming WEB-socket requests

    The interaction with the JavaScript via WEB-Socket includes generation of HTML parts for user interface updates.
    It inherits from abstract eezz.websocket.TWebSocketAgent and implements the method "handle_request"
    The class provides methods to compile EEZZ extensions and to generate complex DOM structures.

"""
import  io
import  json
import  uuid
import  copy
from    copy import deepcopy

from    pathlib         import Path
from    typing          import Callable
from    bs4             import Tag, BeautifulSoup, NavigableString
from    itertools       import product, chain
from    eezz.table      import TTable, TTableCell, TTableRow
from    eezz.websocket  import TWebSocketAgent
from    eezz.service    import TService, TServiceCompiler, TTranslate
from    lark            import Lark, UnexpectedCharacters, Tree, UnexpectedEOF
from    loguru          import logger


[docs] class THttpAgent(TWebSocketAgent): """ Agent handles WEB socket events """ def __init__(self): super().__init__() self.soup = None
[docs] @logger.catch def handle_request(self, request_data: dict) -> dict | None: """ Handle WEB socket requests * **initialize**: The browser sends the complete HTML for analysis. * **call**: The request issues a method call and the result is sent back to the browser :param dict request_data: The request send by the browser :return: Response in JSON stream, containing valid HTML parts for the browser """ x_updates = list() x_tasks = list() x_result = dict() if 'initialize' in request_data: self.soup = BeautifulSoup(request_data['initialize'], 'html.parser', multi_valued_attributes=None) for x in self.soup.css.select('table[data-eezz-compiled]'): x_html = self.generate_html_table(x, x['id']) x_id = x['id'] if x.attrs.get('data-eezz-json'): x_gen_request = {'call': {'function': 'get_header_row', 'args': {}, 'id': x_id}} x_gen_request.update(json.loads(x['data-eezz-json'])) x_tasks.append((x_id, x_gen_request)) x_updates.append({'target': f'{x_id}.caption.innerHTML', 'value': x_html['caption']}) x_updates.append({'target': f'{x_id}.thead.innerHTML', 'value': x_html['thead']}) x_updates.append({'target': f'{x_id}.tbody.innerHTML', 'value': x_html['tbody']}) x_updates.append({'target': f'{x_id}.tfoot.innerHTML', 'value': x_html['tfoot']}) for x in self.soup.css.select('.clzz_grid[data-eezz-compiled]'): x_html = self.generate_html_grid(x) x_id = x['id'] x_updates.append({'target': f'{x_id}.innerHTML', 'value': x_html['tbody']}) if x.attrs.get('data-eezz-json'): x_gen_request = {'call': {'function': 'get_header_row', 'args': {}, 'id': x_id}} x_gen_request.update(json.loads(x['data-eezz-json'])) x_tasks.append((x_id, x_gen_request)) # manage translation if service started with command line option --translate: if TService().translate: x_translate = TTranslate() x_translate.generate_pot(self.soup, request_data['title']) x_result.update({'update': x_updates, 'event': 'init', 'tasks': x_tasks}) return x_result # A call request consists of a method to call and a set ot tags to update (updates might be an empty list) # The update is a list of key-value pairs separated by a colon # The key is the html-element-id and the attribute separated by dot # The value is a valid HTML string to replace either the attribute value or part of the element (for example table.body) if 'call' in request_data or 'update' in request_data: try: # Only interested in the x_tag object, which is stored as key(object, method-name) in the TService database # The method itself is executed in module TWebSocketClient x_event = request_data.get('call') if not x_event: x_event = request_data.get('update') x_keys = list(x_event.keys()) x_event = x_event[x_keys[0]] x_event_id = x_event['id'] x_row = request_data['result'] # x_obj, x_method, x_tag, x_descr = TService().get_method(x_event_id, x_event['function']) x_tag, x_descr = TService().get_tag_ref(x_event_id) for x_key, x_value in request_data['update'].items(): x_sub_keys = x_key.split('.') if x_key.startswith('this') and 'tbody' in x_sub_keys: x_html = self.generate_html_table(x_tag, x_event_id) x_updates.append({'target': f'{x_event_id}.tbody.innerHTML', 'value': x_html['tbody']}) elif 'tbody' in x_sub_keys: x_table_tag = self.soup.find(id=x_sub_keys[0]) x_html = self.generate_html_table(x_table_tag, x_sub_keys[0]) x_updates.append({'target': f'{x_sub_keys[0]}.tbody.innerHTML', 'value': x_html['tbody']}) elif x_key == 'this.subtree': x_id = x_row.id x_table: TTable = x_row.child x_target = x_value.split('.') if x_target[0] != 'this': x_tag: Tag = self.soup.find(id=x_target[0]) if not x_tag: logger.error(f'target for subtree not found (ignored): {x_target[0]}') continue if x_tag.name.lower() == 'table': if x_table is None: # There is no subtree: Send empty value to collapse the tree view x_updates.append({'target': f'{x_id}.subtree', 'value': ''}) continue # Send a subtree template and data TService().objects.update({x_id: (x_table, x_tag, x_descr)}) x_tag_list = x_tag.css.select('tr[data-eezz-json]') for x in x_tag_list: if isinstance(x, dict) and x.get('call'): x['call']['id'] = x_id x_html = self.generate_html_table(x_tag, x_id) x_id = x_row.id x_updates.append({'target': f'{x_id}.subtree', 'value': {'option': x_value, 'template': x_html['template'], 'thead': x_html['thead'], 'tbody': x_html['tbody'], 'tfoot': x_html['tfoot']}}) else: tag_row_template = x_tag.soup.css.select('[data-eezz-template=row]')[0] x_subtree_view = self.generate_html_grid_item(tag_row_template, x_row, x_table) x_updates.append({'target': f'{x_value}.innerHTML', 'value': {'tbody': str(x_subtree_view)}}) else: for x in request_data['result-value']: x_updates.append(x) except KeyError as ex: logger.warning(f'KeyError {ex.args}') x_result = {'update': x_updates} return x_result
[docs] def do_get(self, a_resource: Path | str, a_query: dict) -> str: """ Response to an HTML GET command The agent reads the source, compiles the data-eezz sections and adds the web-socket component It returns the enriched document :param pathlib.Path a_resource: The path to the HTML document, containing EEZZ extensions :param dict a_query: The query string of the URL :return: The compiled version of the HTML file """ x_html = a_resource x_service = TService() if isinstance(a_resource, Path): with a_resource.open('r', encoding="utf-8") as f: x_html = f.read() x_parser = Lark.open(str(Path(x_service.resource_path) / 'eezz.lark')) x_soup = BeautifulSoup(x_html, 'html.parser', multi_valued_attributes=None) # The template table is used to add missing structures as default x_templ_path = x_service.resource_path / 'template.html' with x_templ_path.open('r') as f: x_template = BeautifulSoup(f.read(), 'html.parser', multi_valued_attributes=None) x_templ_table = x_template.body.table for x_chrom in x_soup.css.select('table[data-eezz]'): if not x_chrom.css.select('caption'): x_chrom.append(copy.deepcopy(x_templ_table.caption)) if not x_chrom.css.select('thead'): x_chrom.append(copy.deepcopy(x_templ_table.thead)) if not x_chrom.css.select('tbody'): x_chrom.append(copy.deepcopy(x_templ_table.tbody)) if not x_chrom.css.select('tfoot'): x_chrom.append(copy.deepcopy(x_templ_table.tfoot)) if not x_chrom.has_attr('id'): x_chrom['id'] = str(uuid.uuid1())[:8] # Compile subtree using the current table id for events self.compile_data(x_parser, x_chrom.css.select('[data-eezz]'), x_chrom['id']) for x_chrom in x_soup.css.select('select[data-eezz], .clzz_grid[data-eezz]'): if not x_chrom.has_attr('id'): x_chrom['id'] = str(uuid.uuid1())[:8] self.compile_data(x_parser, x_chrom.css.select('[data-eezz]'), x_chrom['id']) # Compiling the reset of the document self.compile_data(x_parser, x_soup.css.select('[data-eezz]'), '', a_query) return x_soup.prettify()
[docs] @staticmethod def compile_data(a_parser: Lark, a_tag_list: list, a_id: str, a_query: dict = None) -> None: """ Compile data-eezz-json to data-eezz-compile, create tag attributes and generate tag-id to manage incoming requests :param Lark a_parser: The Lark parser to compile EEZZ to json :param list a_tag_list: HTML-Tag to compile :param str a_id: The ID of the tag to be identified for update :param dict a_query: The query of the HTML request """ # logger.debug(f'compile data \n{a_tag_list[0].prettify()}') x_service = TService() for x_tag in a_tag_list: x_id = a_id # --- get instead of pop x_data = x_tag.attrs.get('data-eezz') try: if not x_data: return if not x_id: if x_tag.has_attr('id'): x_id = x_tag.attrs['id'] if x_tag.get('data-eezz-compiled') == "ok": continue x_syntax_tree = a_parser.parse(x_data) x_transformer = TServiceCompiler(x_tag, x_id, a_query) x_tree = x_transformer.transform(x_syntax_tree) x_json = dict() x_list_items = x_tree.children if isinstance(x_tree, Tree) else [x_tree] x_tag['data-eezz-compiled'] = "ok" for x_part in x_list_items: x_part_json = {x_part_key: x_part_val for x_part_key, x_part_val in x_part.items() if x_part_key in ('update', 'call', 'process', 'onload')} x_json.update(x_part_json) if x_json: x_tag['data-eezz-json'] = json.dumps(x_json) # logger.debug(f'{x_data} ==> {x_list_items}') if x_tag.has_attr('data-eezz-template') and x_tag['data-eezz-template'] == 'websocket': x_path = Path(x_service.resource_path / 'websocket.js') x_ws_descr = """var g_eezz_socket_addr = "ws://{host}:{port}";\n """.format(host=TService().host, port=TService().websocket_addr, args='') x_ws_descr += """var g_eezz_arguments = "{args}";\n """.format(args='') with x_path.open('r') as f: x_ws_descr += f.read() x_tag.string = x_ws_descr except (UnexpectedCharacters, UnexpectedEOF) as ex: logger.error(f'{repr(ex)} position = {ex.pos_in_stream} in {x_data=}') logger.exception(ex)
[docs] @staticmethod def format_attributes(a_key: str, a_value: str, a_fmt_funct: Callable, a_id: str = None) -> str: """ Eval template tag-attributes, diving deep into data-eezz-json :param str a_id: The ID which replaces the placeholder 'this' :param str a_key: Thw key string to pick the items in an HTML tag :param str a_value: The dictionary in string format to be formatted :param Callable a_fmt_funct: The function to be called to format the values :return: The formatted string """ if a_key == 'data-eezz-json': x_json = json.loads(a_value) if 'call' in x_json: x_args = x_json['call']['args'] x_json['call']['args'] = {x_key: a_fmt_funct(x_val) for x_key, x_val in x_args.items()} if a_id: x_json['call']['id'] = a_id x_fmt_val = json.dumps(x_json) else: x_fmt_val = a_fmt_funct(a_value) return x_fmt_val
[docs] @staticmethod def format_attributes_update(json_str: str, formatter: Callable) -> str: """ Special routine to format function arguments in the update section :param Callable formatter: Format function for values in curly brackets :param str json_str: An eezz generated json string :return: formatted function arguments """ x_json_obj = json.loads(json_str) x_update = x_json_obj.get('update', x_json_obj.get('onload')) if not x_update: return json_str for x_key, x_val in x_update.items(): if isinstance(x_val, str): x_update[x_key] = formatter(x_val) else: if x_args := x_val.get('args', None): for xx_key, xx_value in x_args.items(): x_args.update({xx_key: formatter(xx_value)}) return json.dumps(x_json_obj)
[docs] def generate_html_cells(self, a_tag: Tag, a_cell: TTableCell) -> Tag: """ Generate HTML cells :param bs4.Tag a_tag: The template tag to generate a table cell :param TTableCell a_cell: The cell providing the data for the HTML tag element :return: A new tag, based on the template and the cell data :rtype: bs4.Tag """ x_fmt_attrs = {x: self.format_attributes(x, y, lambda z: z.format(cell=a_cell)) for x, y in a_tag.attrs.items()} x_new_tag = copy.deepcopy(a_tag) for x in x_new_tag.descendants: if x and isinstance(x, Tag): x.string = x.string.format(cell=a_cell) if x_new_tag.string: x_new_tag.string = a_tag.string.format(cell=a_cell) x_new_tag.attrs = x_fmt_attrs # store the date-time in attribute, so it could be used for in-place formatting: if a_cell.type in ('datetime', 'date', 'time'): x_new_tag['timestamp'] = str(a_cell.value.timestamp()) return x_new_tag
[docs] def generate_html_rows(self, a_html_cells: list, a_tag: Tag, a_row: TTableRow) -> Tag: """ This operation add fixed cells to the table. Cells which are not included as template for table data are used to add a constant info to the row :param list a_html_cells: A list of cells to build up a row :param bs4.Tag a_tag: The parent containing the templates for the row :param TTableRow a_row: The table row values to insert :return: The row with values rendered to HTML """ x_fmt_attrs = {x: self.format_attributes(x, y, lambda z: z.format(row=a_row)) for x, y in a_tag.attrs.items()} x_html_cells = [[copy.deepcopy(x)] if not x.has_attr('data-eezz-compiled') else a_html_cells for x in a_tag.css.select('th,td')] x_html_cells = list(chain.from_iterable(x_html_cells)) try: for x in x_html_cells: if x.has_attr('reference') and x['reference'] == 'row': for x_child in x.descendends: if isinstance(x_child, NavigableString): x.parent.string = x.format(row=a_row) except AttributeError as ex: logger.debug(str(ex)) if a_row.row_id: x_fmt_attrs['id'] = a_row.id x_new_tag = Tag(name=a_tag.name, attrs=x_fmt_attrs) for x in x_html_cells: x_new_tag.append(x) return x_new_tag
[docs] def generate_html_table(self, a_table_tag: Tag, a_id: str) -> dict: """ Generates a table structure in four steps 1. Get the column order and the viewport 2. Get the row templates 3. Evaluate the table cells 4. Send the result separated by table main elements :param bs4.Tag a_table_tag: The parent table tag to produce the output :param str a_id: Unique table ID :return: The generated table separated in sections as dictionary :rtype: dict """ x_table_obj: TTable = TService().get_object(a_id) x_row_template = a_table_tag.css.select('tr[data-eezz-compiled]') x_row_viewport = list(x_table_obj.get_visible_rows()) x_table_header = x_table_obj.get_header_row() # insert the header, so that we could manage header and body in a single stack x_row_viewport.insert(0, x_table_header) # Re-arrange the cells for output x_range = list(range(len(x_table_header.cells))) x_range_cells = [[x_row.cells[index] for index in x_range] for x_row in x_row_viewport] for x_row, x_cells in zip(x_row_viewport, x_range_cells): x_row.cells = x_cells # Evaluate match: It's possible to have a template for each row type (header and body): x_format_row = [([x_tag for x_tag in x_row_template if x_tag.has_attr('data-eezz-match') and x_tag['data-eezz-match'] in x_row.type], x_row) for x_row in x_row_viewport] x_format_cell = [(list(product(x_tag[0].css.select('td[data-eezz-compiled],th[data-eezz-compiled]'), x_row.cells)), x_tag[0], x_row) for x_tag, x_row in x_format_row if x_tag] # Put all together and create HTML x_list_html_cells = [([self.generate_html_cells(x_tag, x_cell) for x_tag, x_cell in x_row_templates], x_tag_tr, x_row) for x_row_templates, x_tag_tr, x_row in x_format_cell] x_list_html_rows = [(self.generate_html_rows(x_html_cells, x_tag_tr, x_row)) for x_html_cells, x_tag_tr, x_row in x_list_html_cells] x_footer = self.generate_table_footer(a_table_tag, x_table_obj, a_id) x_html = dict() x_html['template'] = a_table_tag.prettify() x_html['caption'] = a_table_tag.caption.string.format(table=x_table_obj) x_html['thead'] = ''.join([str(x) for x in x_list_html_rows[:1]]) if len(x_list_html_rows) > 0 else '' x_html['tbody'] = ''.join([str(x) for x in x_list_html_rows[1:]]) if len(x_list_html_rows) > 1 else '' x_html['tfoot'] = x_footer.tr.prettify() if x_footer else '' return x_html
[docs] def generate_html_grid(self, a_tag: Tag) -> dict: """ Besides the table, supported display is grid via class clzz_grid or select :param bs4.Tag a_tag: The HTML tag, which is assigned to a TTable object :return: The DOM of the generated grid as dictionary with key "tbody" :rtype: dict """ x_row_template = a_tag.css.select('[data-eezz-template=row]') x_table = TService().get_object(a_tag.attrs['id']) x_row_viewport = list(x_table.get_visible_rows()) x_format_row = [([x_tag for x_tag in x_row_template if x_tag.has_attr('data-eezz-match') and x_tag['data-eezz-match'] in x_row.type], x_row) for x_row in x_row_viewport] x_list_children = [self.generate_html_grid_item(x_tag[0], x_row, x_table) for x_tag, x_row in x_format_row] return {"tbody": ''.join([str(x) for x in x_list_children])}
[docs] def generate_html_grid_item(self, tag_template: Tag, a_row: TTableRow, a_table: TTable) -> Tag: """ Generates elements of the same kind, derived from a template and update content according the row values :param bs4.Tag tag_template: Template for the entire tile :param TTableRow a_row: Row with data for the specific tile :param TTable a_table: Parent table :return: Generated HTML tag """ # Generate name-value pairs for this row: x_format_cell = {x: y for x, y in zip(a_row.column_descr, a_row.cells)} x_new_element = deepcopy(tag_template) # Cells might split into details if x_cell_templates := x_new_element.css.select('[data-eezz-template=cell]'): for x_tag in x_cell_templates: x_ref = x_tag.attrs.get('data-eezz-reference').split('.') x_cell = x_format_cell.get(x_ref[0], None) if not x_cell: logger.warning(f'no reference to {x_ref[0]}') continue # Cells details part and continue. # Detail tags have to be generated from template. The template will be removed as a last step x_cell.id = a_row.row_id if len(x_ref) > 1 and x_ref[1] == 'detail': x_templates = [(deepcopy(x_tag), detail) for detail in x_cell.details] for x_cnt, x_item in enumerate(x_templates): x_template, x_detail = x_item x_detail.index = x_cnt x_template.attrs['id'] = f'{a_table.id}-{a_row.row_id}-{x_cell.index}-{x_cnt}' x_attrs = {x_key: x_val.format(detail=x_detail) for x_key, x_val in x_template.attrs.items() if not x_key.startswith('data-eezz')} if x_json_str := x_template.attrs.get('data-eezz-json'): x_template.attrs['data-eezz-json'] = self.format_attributes_update(x_json_str, lambda x: x.format(detail=x_detail)) x_template.attrs.update(x_attrs) if x_template.string: x_template.string = x_template.string.format(detail=x_detail) x_tag.insert_before(x_template) x_tag.decompose() continue # Cells main part # Generate unique ID, format call parameter, attributes and strings x_cell.id = a_row.row_id x_attrs = {x_key: x_val.format(cell=x_cell) for x_key, x_val in x_tag.attrs.items() if not x_key.startswith('data-eezz')} if x_json_str := x_tag.attrs.get('data-eezz-json'): x_tag.attrs['data-eezz-json'] = self.format_attributes_update(x_json_str, lambda x: x.format(cell=x_cell)) if x_json_str := x_tag.attrs.get('data-eezz-i18n'): x_tag.attrs['data-eezz-i18n'] = x_json_str.format(cell=x_cell) x_tag.attrs.update(x_attrs) x_tag.attrs['data-eezz-index'] = str(x_cell.index) x_tag.attrs['id'] = f'{a_table.id}-{a_row.row_id}-{x_cell.index}' if x_tag.string: x_tag.string = x_tag.string.format(cell=x_cell) if x_format := x_tag.attrs.get('data-eezz-format'): x_html = f'<p>{x_format.join(x_tag.string.split('\n\n'))}</p>' x_soup = BeautifulSoup(x_html, 'html.parser') x_tag.string = '' x_tag.append(x_soup.p) return x_new_element
if __name__ == '__main__': """:meta private:""" text2 = """ <table id='directory' data-eezz='name: directory, assign: examples.directory.TDirView(path=".", title="dir")'> <tbody> <tr data-eezz='template: row, match: body, event : expand_collapse(rowid={row.row_id}), update : this.subtree=restricted, path_label.innerHTML={row.row_id}'> </tr> </tbody> </table> """ """ """ # list_table = aSoup.css.select('table[data-eezz]') TService.set_environment(root_path=r'C:\Users\alzer\Projects\github\eezz_full\webroot') xx_gen = THttpAgent() xx_html = xx_gen.do_get(text2, dict()) xx_soup = BeautifulSoup(xx_html, 'html.parser', multi_valued_attributes=None) # xx_h1_set = xx_soup.css.select('h1') # xx_h1 = xx_h1_set[0] # xx_str = xx_h1.string # print(xx_str.parent) list_table = xx_soup.css.select('table') for xx in list_table: xjsn = xx['data-eezz'] # json.loads( f'{{ {xjsn} }}' ) # xjsn['assign'] = 'new values for subtree' logger.debug(f'{{ {xjsn} }}') xx_table = xx_gen.generate_html_table(xx, xx['id']) debug_out = io.StringIO() print(json.dumps(xx_table, indent=2), file=debug_out) logger.debug(debug_out.getvalue()) logger.success('done')