Files
hawkbit/hawkbit-compose/ddi-client.py
lingwei.kong 2a425196e6
Some checks failed
Mark & close stale issues / stale (push) Has been cancelled
Vulnerability Scan / trivy-scan (1.0) (push) Has been cancelled
Vulnerability Scan / trivy-scan (master) (push) Has been cancelled
License Scan / license-scan (push) Has been cancelled
提交api调用相关信息
2026-06-18 18:00:17 +08:00

845 lines
36 KiB
Python
Executable File

#!/usr/bin/env python3
"""
Hawkbit DDI Client — lightweight pure-Python OTA update agent.
=================================================================
No dependencies beyond Python 3 standard library.
Runs on Android (adb shell) and Linux hosts.
DDI API version: v1
Ref: https://github.com/eclipse-hawkbit/hawkbit
Capabilities:
• Poll Hawkbit for deployment actions
• Resumable (Range-request) artifact download
• SHA256 file integrity verification
• Status progression: DOWNLOAD → DOWNLOADED → PROCEEDING → FINISHED / FAILURE
• Auto-detects device serial number as controllerId (Android ro.serialno, DMI, machine-id)
• Reports device attributes (osType, platform, hwRevision, swVersion, kernel…) to Hawkbit
• Authentication: GatewayToken or TargetToken (also from file: --gateway-token @/path/to/key)
Quick start:
# Gateway token
ddi-client.py --base-url https://hawkbit:8443 --tenant DEFAULT --gateway-token mykey
# Target token (per-device)
ddi-client.py --base-url http://10.0.0.1:8080 --tenant DEFAULT --target-token abc123
# One-shot poll
ddi-client.py --base-url http://localhost:8080 --tenant DEFAULT --gateway-token k --once
"""
from __future__ import annotations
import argparse
import hashlib
import json
import logging
import os
import subprocess
import ssl
import sys
import time
import urllib.error
import urllib.request
from typing import Any, Dict, List, Optional, Tuple
# ── logging ─────────────────────────────────────────────────────────────────
logger = logging.getLogger("hawkbit-ddi")
# ═══════════════════════════════════════════════════════════════════════════════
# HawkbitDDIClient
# ═══════════════════════════════════════════════════════════════════════════════
class HawkbitDDIClient:
"""Lightweight DDI (Direct Device Integration) client for Eclipse Hawkbit."""
# DDI execution statuses (mapped to Hawkbit action states)
EXEC_DOWNLOAD = "download"
EXEC_DOWNLOADED = "downloaded"
EXEC_PROCEEDING = "proceeding"
EXEC_CLOSED = "closed"
EXEC_CANCELED = "canceled"
EXEC_REJECTED = "rejected"
EXEC_SCHEDULED = "scheduled"
EXEC_RESUMED = "resumed"
RESULT_SUCCESS = "success"
RESULT_FAILURE = "failure"
RESULT_NONE = "none"
# ── constructor ──────────────────────────────────────────────────────
def __init__(
self,
base_url: str,
tenant: str,
controller_id: Optional[str] = None,
auth_header: Optional[str] = None,
download_dir: str = "/tmp/hawkbit",
verify_sha256: bool = True,
polling_interval: int = 60,
ssl_verify: bool = True,
resume_download: bool = True,
) -> None:
self.base_url = base_url.rstrip("/")
self.tenant = tenant
self.controller_id = controller_id or self._detect_device_sn()
self.auth_header = auth_header
self.download_dir = download_dir
self.verify_sha256 = verify_sha256
self.polling_interval = polling_interval
self.ssl_verify = ssl_verify
self.resume_download = resume_download
self._attrs_reported = False
os.makedirs(self.download_dir, exist_ok=True)
# ── device serial detection ───────────────────────────────────────────
@staticmethod
def _detect_device_sn() -> str:
"""Best-effort device serial number detection.
Priority: Android ro.serialno → DMI product_serial → /etc/machine-id → hostname
"""
# 1. Android property
try:
out = subprocess.run(
["getprop", "ro.serialno"],
capture_output=True, text=True, timeout=2,
)
sn = out.stdout.strip()
if sn:
logger.info("Detected Android serial: %s", sn)
return sn
except (FileNotFoundError, subprocess.TimeoutExpired):
pass
# 2. Linux DMI product serial
for p in ("/sys/class/dmi/id/product_serial",
"/sys/devices/virtual/dmi/id/product_serial"):
try:
with open(p) as fh:
sn = fh.read().strip()
if sn and sn not in ("0", "Not Specified", "None", ""):
logger.info("Detected DMI serial: %s", sn)
return sn
except (FileNotFoundError, PermissionError):
pass
# 3. /etc/machine-id (systemd)
try:
with open("/etc/machine-id") as fh:
mid = fh.read().strip()
if mid:
logger.info("Using machine-id: %s", mid)
return mid
except (FileNotFoundError, PermissionError):
pass
# 4. Fallback — hostname
import socket
hostname = socket.gethostname()
logger.warning("No serial found; falling back to hostname: %s", hostname)
return hostname
# ── device attributes detection ─────────────────────────────────────
@staticmethod
def _detect_device_attrs() -> Dict[str, str]:
"""Collect device metadata for Hawkbit target attributes.
These attributes are used by Hawkbit target filters to auto-assign updates.
Returns a dict of key-value pairs.
"""
attrs: Dict[str, str] = {}
# ── OS type ─────────────────────────────────────────────────
attrs["hostname"] = HawkbitDDIClient._safe_hostname()
if os.path.exists("/system/build.prop"):
attrs["osType"] = "android"
else:
attrs["osType"] = "linux"
# ── platform / architecture ─────────────────────────────────
try:
import platform
attrs["platform"] = platform.machine()
except Exception:
pass
# ── kernel version ──────────────────────────────────────────
try:
attrs["kernel"] = os.uname().release
except Exception:
pass
# ── Android: detailed attributes ────────────────────────────
if attrs.get("osType") == "android":
for prop, attr_key in (
("ro.build.version.release", "androidVersion"),
("ro.product.model", "productModel"),
("ro.product.manufacturer", "manufacturer"),
("ro.build.version.incremental", "buildNumber"),
("ro.build.fingerprint", "buildFingerprint"),
):
try:
out = subprocess.run(
["getprop", prop], capture_output=True, text=True, timeout=2,
)
val = out.stdout.strip()
if val:
attrs[attr_key] = val
except (FileNotFoundError, subprocess.TimeoutExpired):
pass
# ── Linux: DMI hardware info ────────────────────────────────
if attrs.get("osType") == "linux":
for path, attr_key in (
("/sys/class/dmi/id/product_name", "productName"),
("/sys/class/dmi/id/product_version", "productVersion"),
("/sys/class/dmi/id/product_sku", "productSku"),
("/sys/class/dmi/id/sys_vendor", "manufacturer"),
):
try:
with open(path) as fh:
val = fh.read().strip()
if val and val not in ("0", "Not Specified", "None", "", "System Product Name"):
attrs[attr_key] = val
except (FileNotFoundError, PermissionError):
pass
# hwRevision: combine product name + version
if "productName" in attrs:
rev = attrs["productName"]
if "productVersion" in attrs:
rev += " " + attrs["productVersion"]
attrs["hwRevision"] = rev
# ── current software version (if installed) ─────────────────
version_file = "/etc/hawkbit/current_version"
try:
with open(version_file) as fh:
v = fh.read().strip()
if v:
attrs["swVersion"] = v
except (FileNotFoundError, PermissionError):
pass
# ── merge user-supplied attrs from file ─────────────────────
attrs_file = "/etc/hawkbit/device_attrs.json"
try:
with open(attrs_file) as fh:
extra = json.load(fh)
if isinstance(extra, dict):
attrs.update({k: str(v) for k, v in extra.items()})
except (FileNotFoundError, PermissionError, json.JSONDecodeError):
pass
return attrs
@staticmethod
def _safe_hostname() -> str:
try:
import socket
return socket.gethostname()
except Exception:
return "unknown"
# ── report device attributes to Hawkbit ──────────────────────────────
def report_attributes(self) -> bool:
"""PUT /{tenant}/controller/v1/{controllerId}/configData
Reports device metadata so Hawkbit can use target filters to assign updates.
Uses ``mode: merge`` — only updates the keys sent, preserves existing attributes.
"""
attrs = self._detect_device_attrs()
url = self._ddi_url(f"{self.controller_id}/configData")
payload = {"mode": "merge", "data": attrs}
logger.info("Reporting %d device attributes → %s", len(attrs), url)
logger.debug("Attributes: %s", json.dumps(attrs))
resp = self._http(url, method="PUT", data=payload)
return resp is not None
def _ddi_url(self, path: str) -> str:
"""Build a DDI v1 URL: <base>/<tenant>/controller/v1/<path>"""
return f"{self.base_url}/{self.tenant}/controller/v1/{path}"
# ── low-level HTTP ────────────────────────────────────────────────────
def _http(
self,
url: str,
method: str = "GET",
data: Any = None,
headers: Optional[Dict[str, str]] = None,
stream: bool = False,
):
"""Make an HTTP request. Returns decoded JSON, or raw file handle when
``stream=True``. Returns ``None`` on HTTP errors."""
req_headers: Dict[str, str] = {"Accept": "application/hal+json, application/json"}
if self.auth_header:
req_headers["Authorization"] = self.auth_header
if data is not None:
req_headers["Content-Type"] = "application/json"
if headers:
req_headers.update(headers)
body = None if data is None else json.dumps(data).encode("utf-8")
ctx = None
if not self.ssl_verify:
ctx = ssl.create_default_context()
ctx.check_hostname = False
ctx.verify_mode = ssl.CERT_NONE
req = urllib.request.Request(url, data=body, headers=req_headers, method=method)
try:
resp = urllib.request.urlopen(req, context=ctx)
except urllib.error.HTTPError as exc:
err_body = exc.read().decode("utf-8", errors="replace")[:500]
logger.error("HTTP %d %s %s%s", exc.code, method, url, err_body)
return None
if stream:
return resp
raw = resp.read()
if not raw:
return {}
return json.loads(raw.decode("utf-8"))
# ── helper: extract best download link from artifact HAL links ─────────
@staticmethod
def _pick_download_url(artifact: dict) -> Optional[str]:
"""Pick the preferred download URL from the artifact's _links.
Priority: download (HTTPS) → download-http (HTTP) → construct from filename
"""
links = artifact.get("_links", {})
if not isinstance(links, dict):
return None
# HAL links are keyed by rel; values can be a single object or a list.
for rel in ("download", "download-http"):
entry = links.get(rel)
if isinstance(entry, list):
for item in entry:
if isinstance(item, dict) and "href" in item:
return item["href"]
elif isinstance(entry, dict) and "href" in entry:
return entry["href"]
return None
# ═══════════════════════════════════════════════════════════════════════
# Public API
# ═══════════════════════════════════════════════════════════════════════
# ── poll ──────────────────────────────────────────────────────────────
def poll(self) -> Optional[dict]:
"""GET /{tenant}/controller/v1/{controllerId}
Returns parsed controller base or None on failure.
"""
url = self._ddi_url(self.controller_id)
logger.debug("Polling GET %s", url)
body = self._http(url)
if body is None:
return None
return self._parse_poll(body)
def _parse_poll(self, body: dict) -> dict:
result: Dict[str, Any] = {
"config": body.get("config", {}),
"links": {},
}
links = body.get("_links", {})
if isinstance(links, dict):
for rel in ("deploymentBase", "cancelAction",
"confirmationBase", "installedBase", "configData"):
entry = links.get(rel)
href = None
if isinstance(entry, dict):
href = entry.get("href")
elif isinstance(entry, list) and entry:
href = entry[0].get("href") if isinstance(entry[0], dict) else None
if href:
result["links"][rel] = href
# Adopt server-suggested polling interval
sleep_str = body.get("config", {}).get("polling", {}).get("sleep", "")
if sleep_str:
try:
h, m, s = (int(x) for x in sleep_str.split(":"))
seconds = h * 3600 + m * 60 + s
if seconds > 0:
self.polling_interval = seconds
except ValueError:
pass
return result
# ── get deployment base ───────────────────────────────────────────────
def get_deployment(self, deployment_url: str) -> Optional[dict]:
"""GET /{tenant}/controller/v1/{controllerId}/deploymentBase/{actionId}
Returns structured deployment info or None.
"""
logger.debug("Fetching deployment: %s", deployment_url)
body = self._http(deployment_url)
if body is None:
return None
return self._parse_deployment(body)
def _parse_deployment(self, body: dict) -> dict:
info = body.get("deployment", {})
chunks = []
for ch in info.get("chunks", []):
artifacts = []
for art in ch.get("artifacts", []):
artifacts.append({
"filename": art["filename"],
"size": art.get("size", 0),
"hashes": art.get("hashes", {}),
"_links": art.get("_links", {}),
})
chunks.append({
"part": ch.get("part", ""),
"version": ch.get("version", ""),
"name": ch.get("name", ""),
"artifacts": artifacts,
})
return {
"action_id": body.get("id"),
"chunks": chunks,
"handling": {
"download": info.get("download", "attempt"),
"update": info.get("update", "attempt"),
"maintenanceWindow": info.get("maintenanceWindow"),
},
}
# ── download artifact ─────────────────────────────────────────────────
def download_artifact(self, artifact: dict, action_id: str = "") -> Optional[str]:
"""Download one artifact with Range-request resume and SHA256 verification.
Returns the local file path on success, or None on failure.
"""
filename = artifact["filename"]
expected_sha256 = artifact.get("hashes", {}).get("sha256")
expected_size = artifact.get("size", 0)
# Resolve the download URL
download_url = self._pick_download_url(artifact)
if not download_url:
# Fallback — construct from DDI path
# Try to extract smId from any artifact HAL link
logger.warning("No download link in artifact — using fallback URL")
import re
sm_id = "0"
links = artifact.get("_links", {})
for entry in links.values():
href = None
if isinstance(entry, dict) and "href" in entry:
href = entry["href"]
elif isinstance(entry, list):
for item in entry:
if isinstance(item, dict) and "href" in item:
href = item["href"]
break
if href:
m = re.search(r"/softwaremodules/(\d+)/", href)
if m:
sm_id = m.group(1)
break
download_url = self._ddi_url(
f"{self.controller_id}/softwaremodules/{sm_id}/artifacts/{filename}"
)
local_path = os.path.join(self.download_dir, filename)
tmp_path = local_path + ".tmp"
# Already downloaded and verified?
if os.path.exists(local_path):
if self._sha256_check(local_path, expected_sha256):
logger.info("Already cached & verified: %s", local_path)
return local_path
logger.warning("Cached file fails SHA256 — re-downloading")
# ── resume ────────────────────────────────────────────────────
downloaded = 0
if self.resume_download and os.path.exists(tmp_path):
downloaded = os.path.getsize(tmp_path)
logger.info("Resuming from byte %d", downloaded)
req_headers: Dict[str, str] = {}
if downloaded > 0:
req_headers["Range"] = f"bytes={downloaded}-"
logger.info("Downloading %s (%s bytes) → %s",
filename, expected_size or "?", tmp_path)
resp = self._http(download_url, headers=req_headers, stream=True)
if resp is None:
logger.error("Download request failed")
return None
# Determine total size (from Content-Range header)
total = expected_size
cr = resp.headers.get("Content-Range", "")
if cr:
try:
total = int(cr.rsplit("/", 1)[-1])
except ValueError:
pass
mode = "ab" if downloaded > 0 else "wb"
try:
with open(tmp_path, mode) as fh:
while True:
chunk = resp.read(64 * 1024)
if not chunk:
break
fh.write(chunk)
downloaded += len(chunk)
if total:
pct = downloaded / total * 100
logger.debug(" %6.1f %% (%d / %d)", pct, downloaded, total)
except Exception as exc:
logger.error("Download interrupted: %s (tmp kept for resume)", exc)
return None
logger.info("Download finished — %d bytes", downloaded)
# SHA256
if self.verify_sha256 and expected_sha256:
if not self._sha256_check(tmp_path, expected_sha256):
logger.error("SHA256 MISMATCH for %s", filename)
return None
# Atomic rename
if os.path.exists(local_path):
os.remove(local_path)
os.rename(tmp_path, local_path)
return local_path
@staticmethod
def _sha256_check(filepath: str, expected: Optional[str]) -> bool:
if not expected:
return True # nothing to compare against
h = hashlib.sha256()
try:
with open(filepath, "rb") as fh:
while True:
chunk = fh.read(64 * 1024)
if not chunk:
break
h.update(chunk)
except OSError:
return False
actual = h.hexdigest()
ok = actual.lower() == expected.lower()
if ok:
logger.info("SHA256 verified %s", actual)
else:
logger.error("SHA256 mismatch!\n expected: %s\n actual: %s", expected, actual)
return ok
# ── send feedback ─────────────────────────────────────────────────────
def send_feedback(
self,
deployment_url: str,
execution: str,
result: str = RESULT_NONE,
progress: Optional[Tuple[int, int]] = None,
details: Optional[List[str]] = None,
code: Optional[int] = None,
) -> Optional[dict]:
"""POST …/feedback — report execution status to the server.
Args:
deployment_url: full URL to the deployment base / cancel action / confirmation base.
execution: one of the EXEC_* constants.
result: ``success``, ``failure``, or ``none``.
progress: ``(cnt, of)`` e.g. ``(2, 5)``.
details: human-readable messages (shows in Hawkbit UI).
code: optional numeric status code.
"""
# strip query params — they belong to GET, not POST /feedback
base = deployment_url.split("?")[0]
url = base + "/feedback"
result_obj: Dict[str, Any] = {"finished": result}
if progress:
result_obj["progress"] = {"cnt": progress[0], "of": progress[1]}
status: Dict[str, Any] = {
"execution": execution,
"result": result_obj,
"details": details or [],
}
if code is not None:
status["code"] = code
payload = {
"timestamp": int(time.time() * 1000),
"status": status,
}
logger.info("Feedback %-12s / %-7s%s", execution, result, url)
return self._http(url, method="POST", data=payload)
# ── process deployment ────────────────────────────────────────────────
def process_deployment(self, deployment_url: str) -> bool:
"""Run the full deployment lifecycle: fetch metadata → download → verify → report."""
dep = self.get_deployment(deployment_url)
if not dep:
logger.error("Cannot fetch deployment details")
self.send_feedback(deployment_url, self.EXEC_CLOSED, self.RESULT_FAILURE,
details=["Failed to fetch deployment base"])
return False
action_id = dep["action_id"]
handling = dep["handling"]
logger.info("══ Action %s ══ download=%s update=%s",
action_id, handling["download"], handling["update"])
# 1. DOWNLOAD phase — report start
self.send_feedback(deployment_url, self.EXEC_DOWNLOAD, self.RESULT_NONE,
details=["Starting download"])
# 2. Fetch every artifact
downloaded: List[str] = []
for chunk in dep["chunks"]:
logger.info("Chunk: %s v%s (%d artifacts)",
chunk["name"], chunk["version"], len(chunk["artifacts"]))
for idx, art in enumerate(chunk["artifacts"], 1):
# Per-artifact progress
self.send_feedback(
deployment_url, self.EXEC_PROCEEDING, self.RESULT_NONE,
progress=(idx, len(chunk["artifacts"])),
details=[f"Downloading {art['filename']}"],
)
local = self.download_artifact(art, action_id)
if local is None:
self.send_feedback(
deployment_url, self.EXEC_CLOSED, self.RESULT_FAILURE,
details=[f"Download failed for {art['filename']}"],
)
return False
downloaded.append(local)
# 3. DOWNLOADED — all artifacts on disk, hashes ok
self.send_feedback(deployment_url, self.EXEC_DOWNLOADED, self.RESULT_NONE,
details=[f"Downloaded {len(downloaded)} artifact(s)"])
# 4. PROCEEDING — install
self.send_feedback(deployment_url, self.EXEC_PROCEEDING, self.RESULT_NONE,
details=["Installing update …"])
success = self._install(downloaded, handling)
# 5. CLOSED — success or failure
if success:
self.send_feedback(deployment_url, self.EXEC_CLOSED, self.RESULT_SUCCESS,
details=["Installation complete"], code=200)
else:
self.send_feedback(deployment_url, self.EXEC_CLOSED, self.RESULT_FAILURE,
details=["Installation failed"], code=500)
return success
# ── install hook — override this for your device ──────────────────────
def _install(self, files: List[str], handling: dict) -> bool:
"""Apply the downloaded update. **Override in subclass or monkey-patch.**"""
logger.info("Install hook — files: %s", files)
# Default: look for update.zip and report success.
for fp in files:
if os.path.basename(fp).lower().endswith(".zip"):
# Example: subprocess.run(["unzip", "-o", fp, "-d", "/data/ota"], check=True)
logger.info("Would install: %s", fp)
return True
return bool(files)
# ── cancel action ─────────────────────────────────────────────────────
def process_cancel(self, cancel_url: str) -> None:
"""Acknowledge a cancel action."""
self.send_feedback(cancel_url + "/feedback", self.EXEC_CANCELED,
self.RESULT_NONE, details=["Cancel acknowledged"])
# ── confirmation (auto-confirm for headless devices) ──────────────────
def process_confirmation(self, confirmation_url: str) -> None:
"""Auto-confirm a WAITING_FOR_CONFIRMATION action."""
logger.info("Auto-confirming: %s", confirmation_url)
body = {
"confirmation": "confirmed",
"code": 200,
"details": ["Auto-confirmed by DDI client"],
}
self._http(confirmation_url + "/feedback", method="POST", data=body)
# ── main loop ─────────────────────────────────────────────────────────
def run(self, *, daemon: bool = True, max_cycles: Optional[int] = None) -> bool:
"""Run the polling loop.
Args:
daemon: If False, poll once and return.
max_cycles: Maximum number of poll cycles (None = forever).
"""
logger.info("╔══════════════════════════════════════════════════════════╗")
logger.info("║ Hawkbit DDI Client v1.0 ║")
logger.info("╠══════════════════════════════════════════════════════════╣")
logger.info("║ Server: %s", self.base_url)
logger.info("║ Tenant: %s", self.tenant)
logger.info("║ Device: %s", self.controller_id)
logger.info("║ Auth: %s",
self.auth_header.split(" ", 1)[0] if self.auth_header else "none")
logger.info("╚══════════════════════════════════════════════════════════╝")
cycle = 0
while max_cycles is None or cycle < max_cycles:
cycle += 1
logger.info("── Poll cycle %d ──", cycle)
poll_result = self.poll()
if poll_result is None:
logger.warning("Poll failed; retrying in %ds", min(self.polling_interval, 30))
if not daemon:
return False
time.sleep(min(self.polling_interval, 30))
continue
# ── report device attributes after first successful poll ──
# Target is auto-created by GET /controller/v1/{SN} (findOrRegister…);
# configData requires an existing target → poll first, then report.
if not self._attrs_reported:
self.report_attributes()
self._attrs_reported = True
links = poll_result.get("links", {})
if "deploymentBase" in links:
self.process_deployment(links["deploymentBase"])
elif "cancelAction" in links:
self.process_cancel(links["cancelAction"])
elif "confirmationBase" in links:
self.process_confirmation(links["confirmationBase"])
if "deploymentBase" not in links and "cancelAction" not in links and "confirmationBase" not in links:
logger.debug("No pending action.")
if not daemon:
return True
logger.info("Sleeping %ds …", self.polling_interval)
time.sleep(self.polling_interval)
return True
# ═══════════════════════════════════════════════════════════════════════════════
# CLI entry point
# ═══════════════════════════════════════════════════════════════════════════════
def main(argv: Optional[List[str]] = None) -> None:
ap = argparse.ArgumentParser(
description="Hawkbit DDI Client — lightweight OTA agent (pure Python)",
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog="""
Examples:
# Gateway token, poll every 30s
%(prog)s -u https://hawkbit.example.com -t DEFAULT --gateway-token s3cret -i 30
# Target token, one-shot
%(prog)s -u http://10.0.0.1:8080 -t DEFAULT --target-token tok --once
# Custom controller-id, insecure SSL, custom download dir
%(prog)s -u https://hawkbit:8443 -t prod --gateway-token k \\
--controller-id edge-042 -d /data/ota --no-ssl-verify -v
""",
)
# ── required ──────────────────────────────────────────────────────
ap.add_argument("-u", "--base-url", required=True,
help="Hawkbit base URL, e.g. https://hawkbit.example.com")
ap.add_argument("-t", "--tenant", required=True,
help="Tenant name, e.g. DEFAULT")
# ── authentication (mutually-exclusive group would be nice but accepts either) ─
ap.add_argument("--gateway-token", metavar="TOKEN",
help="Gateway security token → Authorization: GatewayToken TOKEN")
ap.add_argument("--target-token", metavar="TOKEN",
help="Target security token → Authorization: TargetToken TOKEN")
# ── optional ──────────────────────────────────────────────────────
ap.add_argument("--controller-id",
help="Controller ID (default: auto-detect from device)")
ap.add_argument("-d", "--download-dir", default="/tmp/hawkbit",
help="Directory for downloaded artifacts [default: /tmp/hawkbit]")
ap.add_argument("-i", "--polling-interval", type=int, default=60,
help="Default polling interval in seconds (overridden by server) [default: 60]")
ap.add_argument("--no-verify", action="store_true",
help="Skip SHA256 verification")
ap.add_argument("--no-resume", action="store_true",
help="Disable resumable download")
ap.add_argument("--no-ssl-verify", action="store_true",
help="Disable TLS certificate verification (INSECURE)")
ap.add_argument("--once", action="store_true",
help="Poll exactly once then exit")
ap.add_argument("-v", "--verbose", action="store_true",
help="Debug-level logging")
args = ap.parse_args(argv)
# ── logging ───────────────────────────────────────────────────────
logging.basicConfig(
level=logging.DEBUG if args.verbose else logging.INFO,
format="%(asctime)s [%(levelname)-5s] %(name)s: %(message)s",
datefmt="%H:%M:%S",
)
# ── auth header ───────────────────────────────────────────────────
auth = None
if args.gateway_token:
token = args.gateway_token
if token.startswith("@"):
with open(token[1:]) as f:
token = f.read().strip()
logger.info("Read gateway token from file: %s", args.gateway_token)
auth = f"GatewayToken {token}"
elif args.target_token:
token = args.target_token
if token.startswith("@"):
with open(token[1:]) as f:
token = f.read().strip()
logger.info("Read target token from file: %s", args.target_token)
auth = f"TargetToken {token}"
if not auth:
logger.warning("No authentication token provided — server may reject requests")
# ── run ───────────────────────────────────────────────────────────
client = HawkbitDDIClient(
base_url=args.base_url,
tenant=args.tenant,
controller_id=args.controller_id,
auth_header=auth,
download_dir=args.download_dir,
verify_sha256=not args.no_verify,
polling_interval=args.polling_interval,
ssl_verify=not args.no_ssl_verify,
resume_download=not args.no_resume,
)
try:
ok = client.run(daemon=not args.once)
sys.exit(0 if ok else 1)
except KeyboardInterrupt:
logger.info("Stopped (Ctrl+C)")
sys.exit(0)
if __name__ == "__main__":
main()