diff --git a/public/VERSION b/public/VERSION index 5ba4891c..45c46f77 100644 --- a/public/VERSION +++ b/public/VERSION @@ -1,4 +1,4 @@ -prev_commit=c3b6918ed24a5dab3fd1fec5fa949caf2cd63b69 -date=2026-01-09 16:09:55 -0500 -branch=36-lente-failing-to-run-on-vm -version=0.8-23-gc3b6918e +prev_commit=cba35ebe1c9b75983a6ee63e660d5ca4b0fb0249 +date=2026-03-06 13:13:41 -0500 +branch=43-brother-driver-has-hard-coded-file-path +version=0.8-32-gcba35ebe diff --git a/public/prism/drivers/brother_ql700/AnonymousPro-Regular.ttf b/public/prism/drivers/brother_ql/AnonymousPro-Regular.ttf similarity index 100% rename from public/prism/drivers/brother_ql700/AnonymousPro-Regular.ttf rename to public/prism/drivers/brother_ql/AnonymousPro-Regular.ttf diff --git a/public/prism/drivers/brother_ql700/TIMES.TTF b/public/prism/drivers/brother_ql/TIMES.TTF similarity index 100% rename from public/prism/drivers/brother_ql700/TIMES.TTF rename to public/prism/drivers/brother_ql/TIMES.TTF diff --git a/public/prism/drivers/brother_ql700/brother_ql/README.txt b/public/prism/drivers/brother_ql/brother_ql/README.txt similarity index 100% rename from public/prism/drivers/brother_ql700/brother_ql/README.txt rename to public/prism/drivers/brother_ql/brother_ql/README.txt diff --git a/public/prism/drivers/brother_ql700/brother_ql/__init__.py b/public/prism/drivers/brother_ql/brother_ql/__init__.py similarity index 100% rename from public/prism/drivers/brother_ql700/brother_ql/__init__.py rename to public/prism/drivers/brother_ql/brother_ql/__init__.py diff --git a/public/prism/drivers/brother_ql700/brother_ql/backends/__init__.py b/public/prism/drivers/brother_ql/brother_ql/backends/__init__.py similarity index 100% rename from public/prism/drivers/brother_ql700/brother_ql/backends/__init__.py rename to public/prism/drivers/brother_ql/brother_ql/backends/__init__.py diff --git a/public/prism/drivers/brother_ql700/brother_ql/backends/generic.py b/public/prism/drivers/brother_ql/brother_ql/backends/generic.py similarity index 100% rename from public/prism/drivers/brother_ql700/brother_ql/backends/generic.py rename to public/prism/drivers/brother_ql/brother_ql/backends/generic.py diff --git a/public/prism/drivers/brother_ql700/brother_ql/backends/helpers.py b/public/prism/drivers/brother_ql/brother_ql/backends/helpers.py similarity index 100% rename from public/prism/drivers/brother_ql700/brother_ql/backends/helpers.py rename to public/prism/drivers/brother_ql/brother_ql/backends/helpers.py diff --git a/public/prism/drivers/brother_ql700/brother_ql/backends/linux_kernel.py b/public/prism/drivers/brother_ql/brother_ql/backends/linux_kernel.py similarity index 100% rename from public/prism/drivers/brother_ql700/brother_ql/backends/linux_kernel.py rename to public/prism/drivers/brother_ql/brother_ql/backends/linux_kernel.py diff --git a/public/prism/drivers/brother_ql700/brother_ql/backends/network.py b/public/prism/drivers/brother_ql/brother_ql/backends/network.py similarity index 100% rename from public/prism/drivers/brother_ql700/brother_ql/backends/network.py rename to public/prism/drivers/brother_ql/brother_ql/backends/network.py diff --git a/public/prism/drivers/brother_ql700/brother_ql/backends/pyusb.py b/public/prism/drivers/brother_ql/brother_ql/backends/pyusb.py similarity index 100% rename from public/prism/drivers/brother_ql700/brother_ql/backends/pyusb.py rename to public/prism/drivers/brother_ql/brother_ql/backends/pyusb.py diff --git a/public/prism/drivers/brother_ql700/brother_ql/brother_ql_analyse.py b/public/prism/drivers/brother_ql/brother_ql/brother_ql_analyse.py similarity index 100% rename from public/prism/drivers/brother_ql700/brother_ql/brother_ql_analyse.py rename to public/prism/drivers/brother_ql/brother_ql/brother_ql_analyse.py diff --git a/public/prism/drivers/brother_ql700/brother_ql/brother_ql_create.py b/public/prism/drivers/brother_ql/brother_ql/brother_ql_create.py similarity index 100% rename from public/prism/drivers/brother_ql700/brother_ql/brother_ql_create.py rename to public/prism/drivers/brother_ql/brother_ql/brother_ql_create.py diff --git a/public/prism/drivers/brother_ql700/brother_ql/brother_ql_debug.py b/public/prism/drivers/brother_ql/brother_ql/brother_ql_debug.py similarity index 100% rename from public/prism/drivers/brother_ql700/brother_ql/brother_ql_debug.py rename to public/prism/drivers/brother_ql/brother_ql/brother_ql_debug.py diff --git a/public/prism/drivers/brother_ql700/brother_ql/brother_ql_info.py b/public/prism/drivers/brother_ql/brother_ql/brother_ql_info.py similarity index 100% rename from public/prism/drivers/brother_ql700/brother_ql/brother_ql_info.py rename to public/prism/drivers/brother_ql/brother_ql/brother_ql_info.py diff --git a/public/prism/drivers/brother_ql700/brother_ql/brother_ql_print.py b/public/prism/drivers/brother_ql/brother_ql/brother_ql_print.py similarity index 100% rename from public/prism/drivers/brother_ql700/brother_ql/brother_ql_print.py rename to public/prism/drivers/brother_ql/brother_ql/brother_ql_print.py diff --git a/public/prism/drivers/brother_ql700/brother_ql/cli.py b/public/prism/drivers/brother_ql/brother_ql/cli.py similarity index 100% rename from public/prism/drivers/brother_ql700/brother_ql/cli.py rename to public/prism/drivers/brother_ql/brother_ql/cli.py diff --git a/public/prism/drivers/brother_ql700/brother_ql/conversion.py b/public/prism/drivers/brother_ql/brother_ql/conversion.py similarity index 100% rename from public/prism/drivers/brother_ql700/brother_ql/conversion.py rename to public/prism/drivers/brother_ql/brother_ql/conversion.py diff --git a/public/prism/drivers/brother_ql700/brother_ql/devicedependent.py b/public/prism/drivers/brother_ql/brother_ql/devicedependent.py similarity index 100% rename from public/prism/drivers/brother_ql700/brother_ql/devicedependent.py rename to public/prism/drivers/brother_ql/brother_ql/devicedependent.py diff --git a/public/prism/drivers/brother_ql700/brother_ql/exceptions.py b/public/prism/drivers/brother_ql/brother_ql/exceptions.py similarity index 100% rename from public/prism/drivers/brother_ql700/brother_ql/exceptions.py rename to public/prism/drivers/brother_ql/brother_ql/exceptions.py diff --git a/public/prism/drivers/brother_ql700/brother_ql/helpers.py b/public/prism/drivers/brother_ql/brother_ql/helpers.py similarity index 100% rename from public/prism/drivers/brother_ql700/brother_ql/helpers.py rename to public/prism/drivers/brother_ql/brother_ql/helpers.py diff --git a/public/prism/drivers/brother_ql700/brother_ql/image_trafos.py b/public/prism/drivers/brother_ql/brother_ql/image_trafos.py similarity index 100% rename from public/prism/drivers/brother_ql700/brother_ql/image_trafos.py rename to public/prism/drivers/brother_ql/brother_ql/image_trafos.py diff --git a/public/prism/drivers/brother_ql700/brother_ql/labels.py b/public/prism/drivers/brother_ql/brother_ql/labels.py similarity index 100% rename from public/prism/drivers/brother_ql700/brother_ql/labels.py rename to public/prism/drivers/brother_ql/brother_ql/labels.py diff --git a/public/prism/drivers/brother_ql700/brother_ql/models.py b/public/prism/drivers/brother_ql/brother_ql/models.py similarity index 100% rename from public/prism/drivers/brother_ql700/brother_ql/models.py rename to public/prism/drivers/brother_ql/brother_ql/models.py diff --git a/public/prism/drivers/brother_ql700/brother_ql/output_helpers.py b/public/prism/drivers/brother_ql/brother_ql/output_helpers.py similarity index 100% rename from public/prism/drivers/brother_ql700/brother_ql/output_helpers.py rename to public/prism/drivers/brother_ql/brother_ql/output_helpers.py diff --git a/public/prism/drivers/brother_ql700/brother_ql/raster.py b/public/prism/drivers/brother_ql/brother_ql/raster.py similarity index 100% rename from public/prism/drivers/brother_ql700/brother_ql/raster.py rename to public/prism/drivers/brother_ql/brother_ql/raster.py diff --git a/public/prism/drivers/brother_ql700/brother_ql/reader.py b/public/prism/drivers/brother_ql/brother_ql/reader.py similarity index 100% rename from public/prism/drivers/brother_ql700/brother_ql/reader.py rename to public/prism/drivers/brother_ql/brother_ql/reader.py diff --git a/public/prism/drivers/brother_ql/hwdrv_brother_ql.py b/public/prism/drivers/brother_ql/hwdrv_brother_ql.py new file mode 100755 index 00000000..d4fef63a --- /dev/null +++ b/public/prism/drivers/brother_ql/hwdrv_brother_ql.py @@ -0,0 +1,440 @@ +#! /usr/bin/env python +# -*- coding: utf-8 -*- +""" +Sistemi Corporation, copyright, all rights reserved, 2020-2026 +Vivian Guthrie, Martin Guthrie + +USB Permissions: +When Prism runs from Docker, it has root permissions, so all is good. +However, when you are developing, your user account will not have USB permissions. +Use : sudo chmod -R 777 /dev/bus/usb/ +To unlock USB. This is not a great security solution. +Also need to: sudo adduser $USER lp +And then reboot for that to take effect. + +It is assumed only one (homogeneous) model of Brother printer is used in your deployment, and +the driver will only work with that model. + +""" +import logging +from PIL import Image, ImageDraw, ImageFont +import qrcode +import re +import os +import subprocess +import base64 +import pyudev +from core.const import PUB +from core.sys_log import pub_notice +from threading import Lock + +VERSION = "0.0.1" +DRIVER_TYPE = "Brother_QL" + +# According to PyPi brother_ql profile, these models are supported, +# QL-500 (✓), QL-550 (✓), QL-560 (✓), QL-570 (✓), QL-580N, QL-650TD, QL-700 (✓), +# QL-710W (✓), QL-720NW (✓), QL-800 (✓), QL-810W (✓), QL-820NWB (✓), QL-1050 (✓), and QL-1060N (✓) +# However, only the following has been tested in Prism, add or replace with your specific printer, +BROTHER_QL_MODELS = ["QL-700"] + + +class BrotherQL(object): + """ Brother Ql-XXX Helper Class + + """ + DRIVER_PATH = os.path.dirname(os.path.abspath(__file__)) + WORKING_PATH = os.path.join(DRIVER_PATH, "wip") + + def __init__(self, id, path, model="QL-700"): + self.logger = logging.getLogger("SC.{}".format(__class__.__name__)) + try: + if not os.path.exists(self.WORKING_PATH): + self.logger.info("creating {}".format(self.WORKING_PATH)) + os.makedirs(self.WORKING_PATH) + + except FileExistsError: + pass + + except Exception as e: + self.logger.error("Failed to make path for {}: {}".format(self.WORKING_PATH, e)) + notice = "ERROR path {} failed".format(self.WORKING_PATH) + pub_notice(notice, sender="{}._make_dirs".format(__class__.__name__), type=PUB.NOTICES_ERROR) + + self.lock = Lock() + self.path = path # this is the linux usb path, /dev/usb/lp0 + self.id = id + self.model = model + + def _filter_known_stderr_noise(self, stderr_text): + """ + Remove known harmless warnings from stderr while preserving all real errors. + """ + suppressed_markers = [ + "deprecation warning: brother_ql.devicedependent is deprecated", + ] + + kept_lines = [] + suppressed_count = 0 + + for line in stderr_text.splitlines(): + low = line.lower() + if any(marker in low for marker in suppressed_markers): + suppressed_count += 1 + else: + kept_lines.append(line) + + return kept_lines, suppressed_count + + def _log_filtered_stderr(self, prefix, stderr_bytes): + if not stderr_bytes: + return + + stderr_text = str(stderr_bytes, "utf-8", errors="replace") + kept_lines, suppressed_count = self._filter_known_stderr_noise(stderr_text) + + if not kept_lines: + if suppressed_count: + self.logger.debug("%s contained only suppressed warning(s): %d", prefix, suppressed_count) + return + + info_lines = [] + warning_lines = [] + error_lines = [] + + for line in kept_lines: + stripped = line.strip() + upper = stripped.upper() + + if upper.startswith("INFO:"): + info_lines.append(stripped) + elif upper.startswith("WARNING:"): + warning_lines.append(stripped) + elif upper.startswith("ERROR:") or upper.startswith("CRITICAL:"): + error_lines.append(stripped) + else: + # Unknown stderr format: keep conservative behavior. + error_lines.append(stripped) + + if info_lines: + self.logger.info("%s: %s", prefix, "\n".join(info_lines)) + if warning_lines: + self.logger.warning("%s: %s", prefix, "\n".join(warning_lines)) + if error_lines: + self.logger.error("%s: %s", prefix, "\n".join(error_lines)) + + def _create_barcode(self, string): + qr = qrcode.QRCode(version=1, + error_correction=qrcode.constants.ERROR_CORRECT_L, + box_size=5, + border=1) + + encrypted_info = "{}".format(string) + + qr.add_data(encrypted_info) + qr.make(fit=True) + img = qr.make_image(fill_color="black", back_color="white") + img_filename = os.path.join(self.WORKING_PATH, "qrcode.PNG") + img.save(img_filename) + return img_filename + + def _create_text(self, ruid, chan): + img = Image.new('RGB', (696, 309), color='white') + + d = ImageDraw.Draw(img) + font_path = os.path.join(self.DRIVER_PATH,'TIMES.TTF') + self.logger.info(font_path) + font = ImageFont.truetype(font_path, 50) + d.text((10, 10), "RUID:{}".format(chan), fill=(0, 0, 0), font=font) + font = ImageFont.truetype(font_path, 40) + d.text((10, 54), "{}".format(ruid), fill=(0, 0, 0), font=font) + img_file = os.path.join(self.WORKING_PATH, 'pil_text.png') + img.save(img_file) + return img_file + + def _create_label(self, text_path, barcode_path): + text = Image.open(text_path, 'r') + barcode = Image.open(barcode_path, 'r') + + label = Image.new('RGBA', (696, 280), color="white") + label.paste(text, (0, 0)) + label.paste(barcode, (540, 120)) + img_file = os.path.join(self.WORKING_PATH, 'label.png') + label.save(img_file) + self.logger.info(img_file) + self._save_as_base64(img_file) + return img_file + + def _save_as_base64(self, img_path): + with open("{}".format(img_path), 'rb') as image: + str = base64.b64encode(image.read()) + file = open(os.path.join(self.WORKING_PATH, "img.txt"), 'w') + file.write(str.decode("utf-8")) + file.close() + + def _print_label(self, img_path): + brother_ql = os.path.abspath(self.DRIVER_PATH + "/brother_ql") + + create_cmd = f'python3 {brother_ql}/brother_ql_create.py --model {self.model} {img_path} > {self.WORKING_PATH}/label.bin' + self.logger.debug("create_cmd: %s", create_cmd) + + try: + l_bin = subprocess.Popen( + [create_cmd], + cwd='.', + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + shell=True + ) + l_out, l_err = l_bin.communicate(timeout=30) + + except subprocess.TimeoutExpired: + self.logger.exception("Timeout while creating label.bin") + return False + + except Exception: + self.logger.exception("Unexpected error while creating label.bin") + return False + + if l_out: + self.logger.info(str(l_out, 'utf-8', errors='replace')) + self._log_filtered_stderr("create stderr", l_err) + + if l_bin.returncode: + self.logger.error("create.returncode: %s", l_bin.returncode) + return False + + print_cmd = [ + 'python3', + '{}/brother_ql_print.py'.format(brother_ql), + '{}/label.bin'.format(self.WORKING_PATH), + self.path + ] + self.logger.debug("print_cmd: %s", " ".join(print_cmd)) + + try: + printing = subprocess.Popen( + print_cmd, + cwd='.', + stdout=subprocess.PIPE, + stderr=subprocess.PIPE + ) + p_out, p_err = printing.communicate(timeout=30) + + except subprocess.TimeoutExpired: + self.logger.exception("Timeout while printing label") + return False + + except Exception: + self.logger.exception("Unexpected error while printing label") + return False + + if p_out: + self.logger.info(str(p_out, 'utf-8', errors='replace')) + + self._log_filtered_stderr("print stderr", p_err) + + if printing.returncode: + self.logger.error("printing.returncode: {}".format(printing.returncode)) + return False + + self.logger.info("printed successfully") + return True + + def print_ruid_barcode(self, ruid, chan=0): + """ Print a label with the RUID and a barcode representation of the RUID + + :param ruid: + :return: success + """ + with self.lock: + barcode_path = self._create_barcode(ruid) + text_path = self._create_text(ruid, chan) + label_path = self._create_label(text_path, barcode_path) + return self._print_label(label_path) + + def get_id_path(self): + """ Get ID and path + + :return: id, path + """ + return self.id, self.path + + +class HWDriver(object): + """ + HWDriver is a class that installs a HW driver into the shared state. + This is not the HW driver itself, just an installer. + + """ + SFN = os.path.basename(__file__) + + def __init__(self): + self.logger = logging.getLogger("{}".format(self.SFN)) + self.logger.info("Start") + self._num_chan = 0 + + def _attr_str(self, dev, key): + raw = dev.attributes.get(key) + if raw is None: + return "" + if isinstance(raw, (bytes, bytearray)): + return raw.decode("utf-8", errors="ignore").strip() + return str(raw).strip() + + def _usb_port_sort_key(self, usb_dev): + """ + Sort key based on USB topology path. + Example usb_dev.sys_name: + '1-2' -> (1, 2) + '1-2.3' -> (1, 2, 3) + '2-1.4.2' -> (2, 1, 4, 2) + """ + sys_name = getattr(usb_dev, "sys_name", "") or "" + nums = [int(x) for x in re.findall(r"\d+", sys_name)] + if nums: + return tuple(nums) + + # fallback: unknown topology goes last, keep deterministic ordering + devpath = getattr(usb_dev, "device_path", "") or "" + return (10_000, hash(devpath) & 0xFFFF) + + def _find_brother_printers(self, context): + """ + Returns a list of tuples sorted by USB hub port: + [(sort_key, devnode, manufacturer, product, usb_sys_name), ...] + """ + candidates = [] + seen_devnodes = set() + model_set = set(BROTHER_QL_MODELS) + + # usblp printer character devices typically show up in usbmisc + for lp_dev in context.list_devices(subsystem="usbmisc"): + devnode = lp_dev.device_node + if not devnode or not devnode.startswith("/dev/usb/lp"): + continue + if devnode in seen_devnodes: + continue + + usb_dev = lp_dev.find_parent(subsystem="usb", device_type="usb_device") + if usb_dev is None: + continue + + manufacturer = self._attr_str(usb_dev, "manufacturer") + product = self._attr_str(usb_dev, "product") + + if "Brother" not in manufacturer: + continue + if product not in model_set: + continue + + sort_key = self._usb_port_sort_key(usb_dev) + usb_sys_name = getattr(usb_dev, "sys_name", "") + candidates.append((sort_key, devnode, manufacturer, product, usb_sys_name)) + seen_devnodes.add(devnode) + + candidates.sort(key=lambda x: x[0]) + return candidates + + def discover_channels(self, scriptArgs=None): + """ Determine the number of channels, and populate hw drivers into shared state + + scriptArgs = {"only_one": true|false, "test_label": true|false} + + only_one = Only one Printed is expected to be found and will be shared + test_label = Print a test label before starting the test script + + This driver is for the Brother QL only. + Multiple printers will be ordered by the USB hub port they are connected to. Presumably, + each printer is associated with a Jig, so the order of the printers is important. + + [ {"id": i, # ~slot number of the channel (see Note 1) + "version": , # version of the driver + "hwdrv": , # instance of your hardware driver + + # optional + "close": None, # register a callback on closing the channel, or None + "play": jig_closed_detect # function for detecting jig closed + "show_pass_fail": jig_led # function for indicating pass/fail (like LED) + "show_msg": jig_display # function for indicating test status (like display) + + # not part of the required block + "unique_id": , # unique id of the hardware (for tracking purposes) + ... + }, ... + ] + + Note: + 1) The hw driver objects are expected to have an 'slot' field, the lowest + id is assigned to channel 0, the next highest to channel 1, etc + + :return: <#>, + where #: >0 number of channels, + 0 does not indicate num channels, like a shared hardware driver + <0 error + + list of drivers + """ + drivers = [] + id = 0 + context = pyudev.Context() + + printers = self._find_brother_printers(context) + for _, devnode, manufacturer, product, usb_sys_name in printers: + self.logger.info("Found printer: %s %s at %s (usb=%s)", manufacturer, product, devnode, usb_sys_name) + drivers.append({"id": id, + "version": VERSION, + "hwdrv": BrotherQL(id, devnode, model=product), + "play": None, + "show_pass_fail": None, + "show_msg": None, + "close": None}) + id += 1 + + if not drivers: + self.logger.error("printer not found") + pub_notice("HWDriver:{}: Error none found".format(self.SFN), sender="discover_channels", type=PUB.NOTICES_ERROR) + return -1, DRIVER_TYPE, [] + + # do not allow the test script validation step to succeed if can't print a test label + if scriptArgs and scriptArgs.get("test_label", False): + for d in drivers: + id, path = d["hwdrv"].get_id_path() + success = d["hwdrv"].print_ruid_barcode("id{}-{}".format(id, path)) + if not success: + self.logger.error("failed to print") + pub_notice("HWDriver:{}: failed to print".format(self.SFN), sender="discover_channels", type=PUB.NOTICES_ERROR) + return -1, DRIVER_TYPE, [] + + pub_notice("HWDriver:{}: found {}".format(self.SFN, id), + sender="discover_channels", + type=PUB.NOTICES_NORMAL) + + # Setting number of channels to zero means this device is shared across all test fixtures + if scriptArgs and scriptArgs.get('only_one', False): + self._num_chan = 0 + + return self._num_chan, DRIVER_TYPE, drivers + + def num_channels(self): + return self._num_chan + + def close(self): + self.logger.info("closed") + +# =============================================================================================== +# Debugging code +# - Test your hardware discovery here by running this file from PyCharm (be sure to set the working +# directory as ~/git/scripts, else imports will fail) +# - the purpose is to valid discover_channels() is working +# - you will have to comment out pub_notice() and the core import whilst using this +# +if __name__ == '__main__': + FORMAT = "%(relativeCreated)5d %(filename)30s:%(lineno)4d - %(name)30s:%(funcName)20s() %(levelname)-5.5s : %(message)s" + logging.basicConfig(level=logging.INFO, format=FORMAT ) + logger = logging.getLogger() + logger.setLevel(logging.INFO) + + d = HWDriver() + #d.discover_channels() + d.discover_channels({"test_label": True}) + logger.info("Number channels: {}".format(d.num_channels())) diff --git a/public/prism/drivers/brother_ql700/hwdrv_ql700.py b/public/prism/drivers/brother_ql700/hwdrv_ql700.py deleted file mode 100755 index 8410f4ab..00000000 --- a/public/prism/drivers/brother_ql700/hwdrv_ql700.py +++ /dev/null @@ -1,278 +0,0 @@ -#! /usr/bin/env python -# -*- coding: utf-8 -*- -""" -Sistemi Corporation, copyright, all rights reserved, 2020-2025 -Vivian Guthrie, Martin Guthrie - -USB Permissions: -When Prism runs from Docker, it has root permissions so all is good. -However, when you are developing, your user account will not have USB permissions. -Use : sudo chmod -R 777 /dev/bus/usb/ -To unlock USB. This is not a great security solution. -Also need to: sudo adduser $USER lp -And then reboot for that to take effect. - -""" -import logging -from PIL import Image, ImageDraw, ImageFont -import qrcode -import os -import subprocess -import base64 -import pyudev -from core.const import PUB -from core.sys_log import pub_notice -from threading import Lock - -VERSION = "0.0.1" -DRIVER_TYPE = "Brother_QL-700" - - -class BrotherQL700(object): - """ Brother Ql-700 Helper Class - - """ - DRIVER_PATH = "./public/prism/drivers/brother_ql700/" - WORKING_PATH = DRIVER_PATH + "wip/" - - def __init__(self, id, path): - self.logger = logging.getLogger("SC.{}".format(__class__.__name__)) - try: - if not os.path.exists(self.WORKING_PATH): - self.logger.info("creating {}".format(self.WORKING_PATH)) - os.makedirs(self.WORKING_PATH) - - except FileExistsError: - pass - - except Exception as e: - self.logger.error("Failed to make path for {}: {}".format(self.WORKING_PATH, e)) - notice = "ERROR path {} failed".format(self.WORKING_PATH) - pub_notice(notice, sender="{}._make_dirs".format(__class__.__name__), type=PUB.NOTICES_ERROR) - - self.lock = Lock() - self.path = path # this is the linux usb path, /dev/usb/lp0 - self.id = id - - def _create_barcode(self, string): - qr = qrcode.QRCode(version=1, - error_correction=qrcode.constants.ERROR_CORRECT_L, - box_size=5, - border=1) - - encrypted_info = "{}".format(string) - - qr.add_data(encrypted_info) - qr.make(fit=True) - img = qr.make_image(fill_color="black", back_color="white") - img.save(self.WORKING_PATH + "qrcode.PNG") - img_filename = os.path.abspath(self.WORKING_PATH + "qrcode.PNG") - return img_filename - - def _create_text(self, ruid, chan): - img = Image.new('RGB', (696, 309), color='white') - - d = ImageDraw.Draw(img) - font_path = self.DRIVER_PATH + 'TIMES.TTF' - self.logger.info(font_path) - font = ImageFont.truetype(font_path, 50) - d.text((10, 10), "RUID:{}".format(chan), fill=(0, 0, 0), font=font) - font = ImageFont.truetype(font_path, 40) - d.text((10, 54), "{}".format(ruid), fill=(0, 0, 0), font=font) - img.save(self.WORKING_PATH + 'pil_text.png') - img_file = os.path.abspath(self.WORKING_PATH + 'pil_text.png') - return img_file - - def _create_label(self, text_path, barcode_path): - text = Image.open(text_path, 'r') - barcode = Image.open(barcode_path, 'r') - - label = Image.new('RGBA', (696, 280), color="white") - label.paste(text, (0, 0)) - label.paste(barcode, (540, 120)) - label.save(self.WORKING_PATH + "label.png") - img_file = os.path.abspath(self.WORKING_PATH + 'label.png') - self.logger.info(img_file) - self._save_as_base64(img_file) - return img_file - - def _save_as_base64(self, img_path): - with open("{}".format(img_path), 'rb') as image: - str = base64.b64encode(image.read()) - file = open(self.WORKING_PATH + "img.txt", 'w') - file.write(str.decode("utf-8")) - file.close() - - def _print_label(self, img_path): - brother_ql = os.path.abspath(self.DRIVER_PATH + "/brother_ql") - - l_bin = subprocess.Popen( - ['python3 {}/brother_ql_create.py --model QL-700 {} > {}label.bin'.format(brother_ql, img_path, self.WORKING_PATH)], - cwd='.', stdout=subprocess.PIPE, shell=True) - - # brother_ql_create --model QL-700 label.png > label.bin - - self.logger.info(str(l_bin.communicate()[0], 'utf-8')) - - printing = subprocess.Popen( - ['python3', '{}/brother_ql_print.py'.format(brother_ql), '{}label.bin'.format(self.WORKING_PATH), self.path], - cwd='.', stdout=subprocess.PIPE) - - self.logger.info(str(printing.communicate()[0], 'utf-8')) - if printing.returncode: - self.logger.error("printing.returncode: {}".format(printing.returncode)) - return False - - self.logger.info("printed successfully") - return True - - def print_ruid_barcode(self, ruid, chan=0): - """ Print a label with the RUID and a barcode representation of the RUID - - :param ruid: - :return: success - """ - with self.lock: - barcode_path = self._create_barcode(ruid) - text_path = self._create_text(ruid, chan) - label_path = self._create_label(text_path, barcode_path) - return self._print_label(label_path) - - def get_id_path(self): - """ Get ID and path - - :return: id, path - """ - return self.id, self.path - - -class HWDriver(object): - """ - HWDriver is a class that installs a HW driver into the shared state. - This is not the HW driver itself, just an installer. - - """ - SFN = os.path.basename(__file__) - - def __init__(self): - self.logger = logging.getLogger("{}".format(self.SFN)) - self.logger.info("Start") - self._num_chan = 0 - - def discover_channels(self): - """ Determine the number of channels, and populate hw drivers into shared state - - This driver is for the Brother QL-700 only. - Multiple printers are not supported by this example driver. - Multiple printers could be supported by assigning the /dev/usb/lp# to the channel - number. - - [ {"id": i, # ~slot number of the channel (see Note 1) - "version": , # version of the driver - "hwdrv": , # instance of your hardware driver - - # optional - "close": None, # register a callback on closing the channel, or None - "play": jig_closed_detect # function for detecting jig closed - "show_pass_fail": jig_led # function for indicating pass/fail (like LED) - "show_msg": jig_display # function for indicating test status (like display) - - # not part of the required block - "unique_id": , # unique id of the hardware (for tracking purposes) - ... - }, ... - ] - - Note: - 1) The hw driver objects are expected to have an 'slot' field, the lowest - id is assigned to channel 0, the next highest to channel 1, etc - - :return: <#>, - where #: >0 number of channels, - 0 does not indicate num channels, like a shared hardware driver - <0 error - - list of drivers - """ - drivers = [] - id = 0 - context = pyudev.Context() - - for device in context.list_devices(): - if device.attributes.get("manufacturer", False) and "Brother" in device.attributes.get("manufacturer").decode('utf-8'): - if "QL-700" in str(device.attributes.get("product")): - - if False: # run snippet to show all kinds of useful stuff - for a in device.attributes.available_attributes: - self.logger.info(f"{a} - {str(device.attributes.get(a))}") - friendly_name = None - if 'ID_MODEL' in device: - friendly_name = device['ID_MODEL'] - elif 'ID_MODEL_ENC' in device: - friendly_name = device['ID_MODEL_ENC'] - elif 'ID_VENDOR' in device: - friendly_name = device['ID_VENDOR'] - elif 'ID_USB_PRODUCT' in device: - friendly_name = device['ID_USB_PRODUCT'] - elif 'ID_FS_LABEL' in device: # For file systems/storage devices - friendly_name = device['ID_FS_LABEL'] - self.logger.info(f"friendly_name: {friendly_name}") - - # TODO: find the path programmatically - # TODO: support multiple printers - p = '/dev/usb/lp0' - - drivers.append({"id": id, - "version": VERSION, - "hwdrv": BrotherQL700(id, p), - "play": None, - "show_pass_fail": None, - "show_msg": None, - "close": None}) - id += 1 - - if not drivers: - self.logger.error("printer not found") - pub_notice("HWDriver:{}: Error none found".format(self.SFN), sender="discover_channels", type=PUB.NOTICES_ERROR) - return -1, DRIVER_TYPE, [] - - # do not allow the test script validation step to succeed if can't print a test label - # TODO: make printing a test label a parameter from the script config - for d in drivers: - id, path = d["hwdrv"].get_id_path() - success = d["hwdrv"].print_ruid_barcode("id{}-{}".format(id, path)) - if not success: - self.logger.error("failed to print") - pub_notice("HWDriver:{}: failed to print".format(self.SFN), sender="discover_channels", type=PUB.NOTICES_ERROR) - return -1, DRIVER_TYPE, [] - - #pub_notice("HWDriver:{}: found {} {}".format(self.SFN, id, path), sender="discover_channels", type=PUB.NOTICES_NORMAL) - - # Setting number of channels to zero means this device is shared across all test fixtures - # TODO: change when support for printer per test jig is required/used - self._num_chan = 0 - - # by returning 0, it means this return values DOES not represent number of channels - return 0, DRIVER_TYPE, drivers - - def num_channels(self): - return self._num_chan - - def close(self): - self.logger.info("closed") - -# =============================================================================================== -# Debugging code -# - Test your hardware discover here by running this file from PyCharm (be sure to set the working -# directory as ~/git/scripts, else imports will fail) -# - the purpose is to valid discover_channels() is working -# - you will have to comment out pub_notice() and the core import whilst using this -# -if __name__ == '__main__': - logging.basicConfig() - logger = logging.getLogger() - logger.setLevel(logging.INFO) - - d = HWDriver() - d.discover_channels() - logger.info("Number channels: {}".format(d.num_channels())) diff --git a/public/prism/scripts/example/brother_v0/brother_v0.scr b/public/prism/scripts/example/brother_v0/brother_v0.scr index 554a3607..be0cea59 100644 --- a/public/prism/scripts/example/brother_v0/brother_v0.scr +++ b/public/prism/scripts/example/brother_v0/brother_v0.scr @@ -11,7 +11,7 @@ "config": { "fail_fast": false, "drivers": ["public.prism.drivers.fake.hwdrv_fake", - "public.prism.drivers.brother_ql700.hwdrv_ql700"] + ["public.prism.drivers.brother_ql.hwdrv_brother_ql", {"only_one": true}]] }, "tests": [ { diff --git a/public/prism/scripts/example/brother_v0/brthr00xx.py b/public/prism/scripts/example/brother_v0/brthr00xx.py index e3820a50..60d67e65 100644 --- a/public/prism/scripts/example/brother_v0/brthr00xx.py +++ b/public/prism/scripts/example/brother_v0/brthr00xx.py @@ -1,14 +1,14 @@ #! /usr/bin/env python # -*- coding: utf-8 -*- """ -Sistemi Corporation, copyright, all rights reserved, 2020 +Sistemi Corporation, copyright, all rights reserved, 2020-2026 Martin Guthrie """ import logging from core.test_item import TestItem from public.prism.api import ResultAPI - +from public.prism.drivers.brother_ql.hwdrv_brother_ql import DRIVER_TYPE as BROTHER_DRIVER # file and class name must match class brthr00xx(TestItem): @@ -34,9 +34,9 @@ def BRTHR0xxSETUP(self): """ ctx = self.item_start() # always first line of test - printer_info = self.shared_state.get_drivers(None, type="Brother_QL-700") + printer_info = self.shared_state.get_drivers(None, type=BROTHER_DRIVER) if not printer_info: - self.logger.error("Printer not found") + self.logger.error(f"driver {BROTHER_DRIVER} not found") self.item_end(ResultAPI.RECORD_RESULT_INTERNAL_ERROR) return