#!/usr/bin/env python3 """ Hawkbit DDI Client — lightweight pure-Python OTA update agent. ================================================================= No dependencies beyond Python 3 standard library. Runs on Android (adb shell) and Linux hosts. DDI API version: v1 Ref: https://github.com/eclipse-hawkbit/hawkbit Capabilities: • Poll Hawkbit for deployment actions • Resumable (Range-request) artifact download • SHA256 file integrity verification • Status progression: DOWNLOAD → DOWNLOADED → PROCEEDING → FINISHED / FAILURE • Auto-detects device serial number as controllerId (Android ro.serialno, DMI, machine-id) • Reports device attributes (osType, platform, hwRevision, swVersion, kernel…) to Hawkbit • Authentication: GatewayToken or TargetToken (also from file: --gateway-token @/path/to/key) Quick start: # Gateway token ddi-client.py --base-url https://hawkbit:8443 --tenant DEFAULT --gateway-token mykey # Target token (per-device) ddi-client.py --base-url http://10.0.0.1:8080 --tenant DEFAULT --target-token abc123 # One-shot poll ddi-client.py --base-url http://localhost:8080 --tenant DEFAULT --gateway-token k --once """ from __future__ import annotations import argparse import hashlib import json import logging import os import subprocess import ssl import sys import time import urllib.error import urllib.request from typing import Any, Dict, List, Optional, Tuple # ── logging ───────────────────────────────────────────────────────────────── logger = logging.getLogger("hawkbit-ddi") # ═══════════════════════════════════════════════════════════════════════════════ # HawkbitDDIClient # ═══════════════════════════════════════════════════════════════════════════════ class HawkbitDDIClient: """Lightweight DDI (Direct Device Integration) client for Eclipse Hawkbit.""" # DDI execution statuses (mapped to Hawkbit action states) EXEC_DOWNLOAD = "download" EXEC_DOWNLOADED = "downloaded" EXEC_PROCEEDING = "proceeding" EXEC_CLOSED = "closed" EXEC_CANCELED = "canceled" EXEC_REJECTED = "rejected" EXEC_SCHEDULED = "scheduled" EXEC_RESUMED = "resumed" RESULT_SUCCESS = "success" RESULT_FAILURE = "failure" RESULT_NONE = "none" # ── constructor ────────────────────────────────────────────────────── def __init__( self, base_url: str, tenant: str, controller_id: Optional[str] = None, auth_header: Optional[str] = None, download_dir: str = "/tmp/hawkbit", verify_sha256: bool = True, polling_interval: int = 60, ssl_verify: bool = True, resume_download: bool = True, ) -> None: self.base_url = base_url.rstrip("/") self.tenant = tenant self.controller_id = controller_id or self._detect_device_sn() self.auth_header = auth_header self.download_dir = download_dir self.verify_sha256 = verify_sha256 self.polling_interval = polling_interval self.ssl_verify = ssl_verify self.resume_download = resume_download self._attrs_reported = False os.makedirs(self.download_dir, exist_ok=True) # ── device serial detection ─────────────────────────────────────────── @staticmethod def _detect_device_sn() -> str: """Best-effort device serial number detection. Priority: Android ro.serialno → DMI product_serial → /etc/machine-id → hostname """ # 1. Android property try: out = subprocess.run( ["getprop", "ro.serialno"], capture_output=True, text=True, timeout=2, ) sn = out.stdout.strip() if sn: logger.info("Detected Android serial: %s", sn) return sn except (FileNotFoundError, subprocess.TimeoutExpired): pass # 2. Linux DMI product serial for p in ("/sys/class/dmi/id/product_serial", "/sys/devices/virtual/dmi/id/product_serial"): try: with open(p) as fh: sn = fh.read().strip() if sn and sn not in ("0", "Not Specified", "None", ""): logger.info("Detected DMI serial: %s", sn) return sn except (FileNotFoundError, PermissionError): pass # 3. /etc/machine-id (systemd) try: with open("/etc/machine-id") as fh: mid = fh.read().strip() if mid: logger.info("Using machine-id: %s", mid) return mid except (FileNotFoundError, PermissionError): pass # 4. Fallback — hostname import socket hostname = socket.gethostname() logger.warning("No serial found; falling back to hostname: %s", hostname) return hostname # ── device attributes detection ───────────────────────────────────── @staticmethod def _detect_device_attrs() -> Dict[str, str]: """Collect device metadata for Hawkbit target attributes. These attributes are used by Hawkbit target filters to auto-assign updates. Returns a dict of key-value pairs. """ attrs: Dict[str, str] = {} # ── OS type ───────────────────────────────────────────────── attrs["hostname"] = HawkbitDDIClient._safe_hostname() if os.path.exists("/system/build.prop"): attrs["osType"] = "android" else: attrs["osType"] = "linux" # ── platform / architecture ───────────────────────────────── try: import platform attrs["platform"] = platform.machine() except Exception: pass # ── kernel version ────────────────────────────────────────── try: attrs["kernel"] = os.uname().release except Exception: pass # ── Android: detailed attributes ──────────────────────────── if attrs.get("osType") == "android": for prop, attr_key in ( ("ro.build.version.release", "androidVersion"), ("ro.product.model", "productModel"), ("ro.product.manufacturer", "manufacturer"), ("ro.build.version.incremental", "buildNumber"), ("ro.build.fingerprint", "buildFingerprint"), ): try: out = subprocess.run( ["getprop", prop], capture_output=True, text=True, timeout=2, ) val = out.stdout.strip() if val: attrs[attr_key] = val except (FileNotFoundError, subprocess.TimeoutExpired): pass # ── Linux: DMI hardware info ──────────────────────────────── if attrs.get("osType") == "linux": for path, attr_key in ( ("/sys/class/dmi/id/product_name", "productName"), ("/sys/class/dmi/id/product_version", "productVersion"), ("/sys/class/dmi/id/product_sku", "productSku"), ("/sys/class/dmi/id/sys_vendor", "manufacturer"), ): try: with open(path) as fh: val = fh.read().strip() if val and val not in ("0", "Not Specified", "None", "", "System Product Name"): attrs[attr_key] = val except (FileNotFoundError, PermissionError): pass # hwRevision: combine product name + version if "productName" in attrs: rev = attrs["productName"] if "productVersion" in attrs: rev += " " + attrs["productVersion"] attrs["hwRevision"] = rev # ── current software version (if installed) ───────────────── version_file = "/etc/hawkbit/current_version" try: with open(version_file) as fh: v = fh.read().strip() if v: attrs["swVersion"] = v except (FileNotFoundError, PermissionError): pass # ── merge user-supplied attrs from file ───────────────────── attrs_file = "/etc/hawkbit/device_attrs.json" try: with open(attrs_file) as fh: extra = json.load(fh) if isinstance(extra, dict): attrs.update({k: str(v) for k, v in extra.items()}) except (FileNotFoundError, PermissionError, json.JSONDecodeError): pass return attrs @staticmethod def _safe_hostname() -> str: try: import socket return socket.gethostname() except Exception: return "unknown" # ── report device attributes to Hawkbit ────────────────────────────── def report_attributes(self) -> bool: """PUT /{tenant}/controller/v1/{controllerId}/configData Reports device metadata so Hawkbit can use target filters to assign updates. Uses ``mode: merge`` — only updates the keys sent, preserves existing attributes. """ attrs = self._detect_device_attrs() url = self._ddi_url(f"{self.controller_id}/configData") payload = {"mode": "merge", "data": attrs} logger.info("Reporting %d device attributes → %s", len(attrs), url) logger.debug("Attributes: %s", json.dumps(attrs)) resp = self._http(url, method="PUT", data=payload) return resp is not None def _ddi_url(self, path: str) -> str: """Build a DDI v1 URL: //controller/v1/""" return f"{self.base_url}/{self.tenant}/controller/v1/{path}" # ── low-level HTTP ──────────────────────────────────────────────────── def _http( self, url: str, method: str = "GET", data: Any = None, headers: Optional[Dict[str, str]] = None, stream: bool = False, ): """Make an HTTP request. Returns decoded JSON, or raw file handle when ``stream=True``. Returns ``None`` on HTTP errors.""" req_headers: Dict[str, str] = {"Accept": "application/hal+json, application/json"} if self.auth_header: req_headers["Authorization"] = self.auth_header if data is not None: req_headers["Content-Type"] = "application/json" if headers: req_headers.update(headers) body = None if data is None else json.dumps(data).encode("utf-8") ctx = None if not self.ssl_verify: ctx = ssl.create_default_context() ctx.check_hostname = False ctx.verify_mode = ssl.CERT_NONE req = urllib.request.Request(url, data=body, headers=req_headers, method=method) try: resp = urllib.request.urlopen(req, context=ctx) except urllib.error.HTTPError as exc: err_body = exc.read().decode("utf-8", errors="replace")[:500] logger.error("HTTP %d %s %s → %s", exc.code, method, url, err_body) return None if stream: return resp raw = resp.read() if not raw: return {} return json.loads(raw.decode("utf-8")) # ── helper: extract best download link from artifact HAL links ───────── @staticmethod def _pick_download_url(artifact: dict) -> Optional[str]: """Pick the preferred download URL from the artifact's _links. Priority: download (HTTPS) → download-http (HTTP) → construct from filename """ links = artifact.get("_links", {}) if not isinstance(links, dict): return None # HAL links are keyed by rel; values can be a single object or a list. for rel in ("download", "download-http"): entry = links.get(rel) if isinstance(entry, list): for item in entry: if isinstance(item, dict) and "href" in item: return item["href"] elif isinstance(entry, dict) and "href" in entry: return entry["href"] return None # ═══════════════════════════════════════════════════════════════════════ # Public API # ═══════════════════════════════════════════════════════════════════════ # ── poll ────────────────────────────────────────────────────────────── def poll(self) -> Optional[dict]: """GET /{tenant}/controller/v1/{controllerId} Returns parsed controller base or None on failure. """ url = self._ddi_url(self.controller_id) logger.debug("Polling GET %s", url) body = self._http(url) if body is None: return None return self._parse_poll(body) def _parse_poll(self, body: dict) -> dict: result: Dict[str, Any] = { "config": body.get("config", {}), "links": {}, } links = body.get("_links", {}) if isinstance(links, dict): for rel in ("deploymentBase", "cancelAction", "confirmationBase", "installedBase", "configData"): entry = links.get(rel) href = None if isinstance(entry, dict): href = entry.get("href") elif isinstance(entry, list) and entry: href = entry[0].get("href") if isinstance(entry[0], dict) else None if href: result["links"][rel] = href # Adopt server-suggested polling interval sleep_str = body.get("config", {}).get("polling", {}).get("sleep", "") if sleep_str: try: h, m, s = (int(x) for x in sleep_str.split(":")) seconds = h * 3600 + m * 60 + s if seconds > 0: self.polling_interval = seconds except ValueError: pass return result # ── get deployment base ─────────────────────────────────────────────── def get_deployment(self, deployment_url: str) -> Optional[dict]: """GET /{tenant}/controller/v1/{controllerId}/deploymentBase/{actionId} Returns structured deployment info or None. """ logger.debug("Fetching deployment: %s", deployment_url) body = self._http(deployment_url) if body is None: return None return self._parse_deployment(body) def _parse_deployment(self, body: dict) -> dict: info = body.get("deployment", {}) chunks = [] for ch in info.get("chunks", []): artifacts = [] for art in ch.get("artifacts", []): artifacts.append({ "filename": art["filename"], "size": art.get("size", 0), "hashes": art.get("hashes", {}), "_links": art.get("_links", {}), }) chunks.append({ "part": ch.get("part", ""), "version": ch.get("version", ""), "name": ch.get("name", ""), "artifacts": artifacts, }) return { "action_id": body.get("id"), "chunks": chunks, "handling": { "download": info.get("download", "attempt"), "update": info.get("update", "attempt"), "maintenanceWindow": info.get("maintenanceWindow"), }, } # ── download artifact ───────────────────────────────────────────────── def download_artifact(self, artifact: dict, action_id: str = "") -> Optional[str]: """Download one artifact with Range-request resume and SHA256 verification. Returns the local file path on success, or None on failure. """ filename = artifact["filename"] expected_sha256 = artifact.get("hashes", {}).get("sha256") expected_size = artifact.get("size", 0) # Resolve the download URL download_url = self._pick_download_url(artifact) if not download_url: # Fallback — construct from DDI path # Try to extract smId from any artifact HAL link logger.warning("No download link in artifact — using fallback URL") import re sm_id = "0" links = artifact.get("_links", {}) for entry in links.values(): href = None if isinstance(entry, dict) and "href" in entry: href = entry["href"] elif isinstance(entry, list): for item in entry: if isinstance(item, dict) and "href" in item: href = item["href"] break if href: m = re.search(r"/softwaremodules/(\d+)/", href) if m: sm_id = m.group(1) break download_url = self._ddi_url( f"{self.controller_id}/softwaremodules/{sm_id}/artifacts/{filename}" ) local_path = os.path.join(self.download_dir, filename) tmp_path = local_path + ".tmp" # Already downloaded and verified? if os.path.exists(local_path): if self._sha256_check(local_path, expected_sha256): logger.info("Already cached & verified: %s", local_path) return local_path logger.warning("Cached file fails SHA256 — re-downloading") # ── resume ──────────────────────────────────────────────────── downloaded = 0 if self.resume_download and os.path.exists(tmp_path): downloaded = os.path.getsize(tmp_path) logger.info("Resuming from byte %d", downloaded) req_headers: Dict[str, str] = {} if downloaded > 0: req_headers["Range"] = f"bytes={downloaded}-" logger.info("Downloading %s (%s bytes) → %s", filename, expected_size or "?", tmp_path) resp = self._http(download_url, headers=req_headers, stream=True) if resp is None: logger.error("Download request failed") return None # Determine total size (from Content-Range header) total = expected_size cr = resp.headers.get("Content-Range", "") if cr: try: total = int(cr.rsplit("/", 1)[-1]) except ValueError: pass mode = "ab" if downloaded > 0 else "wb" try: with open(tmp_path, mode) as fh: while True: chunk = resp.read(64 * 1024) if not chunk: break fh.write(chunk) downloaded += len(chunk) if total: pct = downloaded / total * 100 logger.debug(" %6.1f %% (%d / %d)", pct, downloaded, total) except Exception as exc: logger.error("Download interrupted: %s (tmp kept for resume)", exc) return None logger.info("Download finished — %d bytes", downloaded) # SHA256 if self.verify_sha256 and expected_sha256: if not self._sha256_check(tmp_path, expected_sha256): logger.error("SHA256 MISMATCH for %s", filename) return None # Atomic rename if os.path.exists(local_path): os.remove(local_path) os.rename(tmp_path, local_path) return local_path @staticmethod def _sha256_check(filepath: str, expected: Optional[str]) -> bool: if not expected: return True # nothing to compare against h = hashlib.sha256() try: with open(filepath, "rb") as fh: while True: chunk = fh.read(64 * 1024) if not chunk: break h.update(chunk) except OSError: return False actual = h.hexdigest() ok = actual.lower() == expected.lower() if ok: logger.info("SHA256 verified %s", actual) else: logger.error("SHA256 mismatch!\n expected: %s\n actual: %s", expected, actual) return ok # ── send feedback ───────────────────────────────────────────────────── def send_feedback( self, deployment_url: str, execution: str, result: str = RESULT_NONE, progress: Optional[Tuple[int, int]] = None, details: Optional[List[str]] = None, code: Optional[int] = None, ) -> Optional[dict]: """POST …/feedback — report execution status to the server. Args: deployment_url: full URL to the deployment base / cancel action / confirmation base. execution: one of the EXEC_* constants. result: ``success``, ``failure``, or ``none``. progress: ``(cnt, of)`` e.g. ``(2, 5)``. details: human-readable messages (shows in Hawkbit UI). code: optional numeric status code. """ # strip query params — they belong to GET, not POST /feedback base = deployment_url.split("?")[0] url = base + "/feedback" result_obj: Dict[str, Any] = {"finished": result} if progress: result_obj["progress"] = {"cnt": progress[0], "of": progress[1]} status: Dict[str, Any] = { "execution": execution, "result": result_obj, "details": details or [], } if code is not None: status["code"] = code payload = { "timestamp": int(time.time() * 1000), "status": status, } logger.info("Feedback %-12s / %-7s → %s", execution, result, url) return self._http(url, method="POST", data=payload) # ── process deployment ──────────────────────────────────────────────── def process_deployment(self, deployment_url: str) -> bool: """Run the full deployment lifecycle: fetch metadata → download → verify → report.""" dep = self.get_deployment(deployment_url) if not dep: logger.error("Cannot fetch deployment details") self.send_feedback(deployment_url, self.EXEC_CLOSED, self.RESULT_FAILURE, details=["Failed to fetch deployment base"]) return False action_id = dep["action_id"] handling = dep["handling"] logger.info("══ Action %s ══ download=%s update=%s", action_id, handling["download"], handling["update"]) # 1. DOWNLOAD phase — report start self.send_feedback(deployment_url, self.EXEC_DOWNLOAD, self.RESULT_NONE, details=["Starting download"]) # 2. Fetch every artifact downloaded: List[str] = [] for chunk in dep["chunks"]: logger.info("Chunk: %s v%s (%d artifacts)", chunk["name"], chunk["version"], len(chunk["artifacts"])) for idx, art in enumerate(chunk["artifacts"], 1): # Per-artifact progress self.send_feedback( deployment_url, self.EXEC_PROCEEDING, self.RESULT_NONE, progress=(idx, len(chunk["artifacts"])), details=[f"Downloading {art['filename']}"], ) local = self.download_artifact(art, action_id) if local is None: self.send_feedback( deployment_url, self.EXEC_CLOSED, self.RESULT_FAILURE, details=[f"Download failed for {art['filename']}"], ) return False downloaded.append(local) # 3. DOWNLOADED — all artifacts on disk, hashes ok self.send_feedback(deployment_url, self.EXEC_DOWNLOADED, self.RESULT_NONE, details=[f"Downloaded {len(downloaded)} artifact(s)"]) # 4. PROCEEDING — install self.send_feedback(deployment_url, self.EXEC_PROCEEDING, self.RESULT_NONE, details=["Installing update …"]) success = self._install(downloaded, handling) # 5. CLOSED — success or failure if success: self.send_feedback(deployment_url, self.EXEC_CLOSED, self.RESULT_SUCCESS, details=["Installation complete"], code=200) else: self.send_feedback(deployment_url, self.EXEC_CLOSED, self.RESULT_FAILURE, details=["Installation failed"], code=500) return success # ── install hook — override this for your device ────────────────────── def _install(self, files: List[str], handling: dict) -> bool: """Apply the downloaded update. **Override in subclass or monkey-patch.**""" logger.info("Install hook — files: %s", files) # Default: look for update.zip and report success. for fp in files: if os.path.basename(fp).lower().endswith(".zip"): # Example: subprocess.run(["unzip", "-o", fp, "-d", "/data/ota"], check=True) logger.info("Would install: %s", fp) return True return bool(files) # ── cancel action ───────────────────────────────────────────────────── def process_cancel(self, cancel_url: str) -> None: """Acknowledge a cancel action.""" self.send_feedback(cancel_url + "/feedback", self.EXEC_CANCELED, self.RESULT_NONE, details=["Cancel acknowledged"]) # ── confirmation (auto-confirm for headless devices) ────────────────── def process_confirmation(self, confirmation_url: str) -> None: """Auto-confirm a WAITING_FOR_CONFIRMATION action.""" logger.info("Auto-confirming: %s", confirmation_url) body = { "confirmation": "confirmed", "code": 200, "details": ["Auto-confirmed by DDI client"], } self._http(confirmation_url + "/feedback", method="POST", data=body) # ── main loop ───────────────────────────────────────────────────────── def run(self, *, daemon: bool = True, max_cycles: Optional[int] = None) -> bool: """Run the polling loop. Args: daemon: If False, poll once and return. max_cycles: Maximum number of poll cycles (None = forever). """ logger.info("╔══════════════════════════════════════════════════════════╗") logger.info("║ Hawkbit DDI Client v1.0 ║") logger.info("╠══════════════════════════════════════════════════════════╣") logger.info("║ Server: %s", self.base_url) logger.info("║ Tenant: %s", self.tenant) logger.info("║ Device: %s", self.controller_id) logger.info("║ Auth: %s", self.auth_header.split(" ", 1)[0] if self.auth_header else "none") logger.info("╚══════════════════════════════════════════════════════════╝") cycle = 0 while max_cycles is None or cycle < max_cycles: cycle += 1 logger.info("── Poll cycle %d ──", cycle) poll_result = self.poll() if poll_result is None: logger.warning("Poll failed; retrying in %ds", min(self.polling_interval, 30)) if not daemon: return False time.sleep(min(self.polling_interval, 30)) continue # ── report device attributes after first successful poll ── # Target is auto-created by GET /controller/v1/{SN} (findOrRegister…); # configData requires an existing target → poll first, then report. if not self._attrs_reported: self.report_attributes() self._attrs_reported = True links = poll_result.get("links", {}) if "deploymentBase" in links: self.process_deployment(links["deploymentBase"]) elif "cancelAction" in links: self.process_cancel(links["cancelAction"]) elif "confirmationBase" in links: self.process_confirmation(links["confirmationBase"]) if "deploymentBase" not in links and "cancelAction" not in links and "confirmationBase" not in links: logger.debug("No pending action.") if not daemon: return True logger.info("Sleeping %ds …", self.polling_interval) time.sleep(self.polling_interval) return True # ═══════════════════════════════════════════════════════════════════════════════ # CLI entry point # ═══════════════════════════════════════════════════════════════════════════════ def main(argv: Optional[List[str]] = None) -> None: ap = argparse.ArgumentParser( description="Hawkbit DDI Client — lightweight OTA agent (pure Python)", formatter_class=argparse.RawDescriptionHelpFormatter, epilog=""" Examples: # Gateway token, poll every 30s %(prog)s -u https://hawkbit.example.com -t DEFAULT --gateway-token s3cret -i 30 # Target token, one-shot %(prog)s -u http://10.0.0.1:8080 -t DEFAULT --target-token tok --once # Custom controller-id, insecure SSL, custom download dir %(prog)s -u https://hawkbit:8443 -t prod --gateway-token k \\ --controller-id edge-042 -d /data/ota --no-ssl-verify -v """, ) # ── required ────────────────────────────────────────────────────── ap.add_argument("-u", "--base-url", required=True, help="Hawkbit base URL, e.g. https://hawkbit.example.com") ap.add_argument("-t", "--tenant", required=True, help="Tenant name, e.g. DEFAULT") # ── authentication (mutually-exclusive group would be nice but accepts either) ─ ap.add_argument("--gateway-token", metavar="TOKEN", help="Gateway security token → Authorization: GatewayToken TOKEN") ap.add_argument("--target-token", metavar="TOKEN", help="Target security token → Authorization: TargetToken TOKEN") # ── optional ────────────────────────────────────────────────────── ap.add_argument("--controller-id", help="Controller ID (default: auto-detect from device)") ap.add_argument("-d", "--download-dir", default="/tmp/hawkbit", help="Directory for downloaded artifacts [default: /tmp/hawkbit]") ap.add_argument("-i", "--polling-interval", type=int, default=60, help="Default polling interval in seconds (overridden by server) [default: 60]") ap.add_argument("--no-verify", action="store_true", help="Skip SHA256 verification") ap.add_argument("--no-resume", action="store_true", help="Disable resumable download") ap.add_argument("--no-ssl-verify", action="store_true", help="Disable TLS certificate verification (INSECURE)") ap.add_argument("--once", action="store_true", help="Poll exactly once then exit") ap.add_argument("-v", "--verbose", action="store_true", help="Debug-level logging") args = ap.parse_args(argv) # ── logging ─────────────────────────────────────────────────────── logging.basicConfig( level=logging.DEBUG if args.verbose else logging.INFO, format="%(asctime)s [%(levelname)-5s] %(name)s: %(message)s", datefmt="%H:%M:%S", ) # ── auth header ─────────────────────────────────────────────────── auth = None if args.gateway_token: token = args.gateway_token if token.startswith("@"): with open(token[1:]) as f: token = f.read().strip() logger.info("Read gateway token from file: %s", args.gateway_token) auth = f"GatewayToken {token}" elif args.target_token: token = args.target_token if token.startswith("@"): with open(token[1:]) as f: token = f.read().strip() logger.info("Read target token from file: %s", args.target_token) auth = f"TargetToken {token}" if not auth: logger.warning("No authentication token provided — server may reject requests") # ── run ─────────────────────────────────────────────────────────── client = HawkbitDDIClient( base_url=args.base_url, tenant=args.tenant, controller_id=args.controller_id, auth_header=auth, download_dir=args.download_dir, verify_sha256=not args.no_verify, polling_interval=args.polling_interval, ssl_verify=not args.no_ssl_verify, resume_download=not args.no_resume, ) try: ok = client.run(daemon=not args.once) sys.exit(0 if ok else 1) except KeyboardInterrupt: logger.info("Stopped (Ctrl+C)") sys.exit(0) if __name__ == "__main__": main()