aboutsummaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
authorThomas Van Iseghem <[email protected]>2023-05-22 22:03:48 +0200
committerThomas Van Iseghem <[email protected]>2023-05-22 22:03:48 +0200
commit3c0a1adc2e83215bce92ff26a519e85dbfa552a9 (patch)
tree3104938651e0ae646453d0bd7948928448e7d0f7
parent5119b52c6c379b91a14cbf5ad7213431c0e32951 (diff)
downloadOpenCortex-3c0a1adc2e83215bce92ff26a519e85dbfa552a9.tar.gz
OpenCortex-3c0a1adc2e83215bce92ff26a519e85dbfa552a9.zip
Very basic CorOS downloader tool
-rw-r--r--Firmware-download/coros_downloader.py159
-rw-r--r--Firmware-download/requirements.txt1
2 files changed, 160 insertions, 0 deletions
diff --git a/Firmware-download/coros_downloader.py b/Firmware-download/coros_downloader.py
new file mode 100644
index 0000000..58edd5b
--- /dev/null
+++ b/Firmware-download/coros_downloader.py
@@ -0,0 +1,159 @@
+import urllib.request
+import datetime
+import hashlib
+import hmac
+import json
+import sys
+from tqdm import tqdm
+import requests
+
+download_dir = "./"
+updater_mode = "download"
+
+class CorOS_Downloader:
+ firmware_version_lut = {
+ "2.0.1": "cIR97IrfvF4fKfO5_KIm_j_aP2zGiHp_",
+ }
+
+ # Hardcoded values, might be moved to a config file
+ service = "execute-api"
+ host = "config-api.neuraldsp.com"
+ endpoint = "/api/v1/config/firmware/latest"
+ stage = ""
+ firmwareType = "update_QC_rev5.bin"
+ filename = "update"
+ access_key = "raphael_zpu"
+ secret_key = "a540MUwH+cjmBEg1mNFKKsXwoRerk/IPWoQP+UDFo0M="
+
+ def __init__(self, version, updatermode, download_dir):
+ self.firmwareVersion = self.firmware_version_lut[version]
+ self.download_dir = download_dir
+ self.updatermode = updatermode
+
+ @property
+ def url(self):
+ return "https://{}{}{}".format(self.host, self.stage, self.endpoint)
+
+ @property
+ def date(self):
+ return str(int((datetime.datetime.utcnow() - datetime.datetime(1970, 1, 1)).total_seconds()))
+
+ @property
+ def payload(self):
+ payload_dict = {
+ "deviceId": self.access_key,
+ "firmwareType": self.firmwareType,
+ "firmwareVersion": self.firmwareVersion
+ }
+ return json.dumps(payload_dict)
+
+ @property
+ def headers(self):
+ return 'content-type:application/json;host:{};x-cortex-date:{}'.format(self.host, self.date)
+
+ def sign_request(self):
+ string_to_sign = '{}\n{}\n{}\n{}\n{}'.format("POST", self.endpoint, "", hashlib.sha256(self.payload.encode("utf-8")).hexdigest(), self.headers)
+ return hmac.new(self.secret_key.encode('utf-8'), string_to_sign.encode('utf-8'), hashlib.sha256).hexdigest()
+
+
+ def build_authorization_header(self, signature):
+ return f'Key={self.access_key} Signature={signature}'
+
+ # Define the progress callback function
+ def _progress_callback(self, count, block_size, total_size):
+ if(count == 0):
+ self.progress_bar = tqdm(total=total_size, unit='B', unit_scale=True)
+ self.progress_bar.update(count * block_size - self.progress_bar.n)
+ if self.progress_bar.n >= total_size:
+ self.progress_bar.close()
+
+ def download_file(self, url, version_id):
+ file = f"{self.download_dir}/{self.filename}-{version_id}.bin" if self.filename else f"{self.download_dir}/{version_id}.bin"
+ urllib.request.urlretrieve(url, file, self._progress_callback)
+
+ def download_changelog(self, url):
+ file = '{}/changelog'.format(self.download_dir)
+
+ urllib.request.urlretrieve(url, file, self._progress_callback)
+
+ def check_update(self, output):
+ try:
+ parsed = json.loads(output)
+ url = parsed.get("url")
+ versionId = parsed.get("versionId")
+ message = parsed.get("message")
+ changelogUrl = parsed.get("changelogUrl")
+ if url:
+ print(f"New version available, versionId: {versionId}.")
+ print(f"url: {url}")
+
+ if changelogUrl:
+ print ("New changelog available")
+ print (f"changelogUrl: {changelogUrl}")
+ self.download_changelog(changelogUrl)
+ else:
+ print ("No new changelog found")
+ return True
+ else:
+ if message:
+ print("No new version available")
+ else:
+ print("Try again later, server seems to be busy")
+ return False
+ except:
+ print("An error occurred while trying to check for the latest version")
+
+ def updater_download(self, output):
+ try:
+ parsed = json.loads(output)
+ url = parsed.get("url")
+ versionId = parsed.get("versionId")
+ message = parsed.get("message")
+ if url:
+ response = urllib.request.urlopen(url)
+ size = response.getheader('Content-Length')
+ fname = open(f"{self.download_dir}/{'{}-{}.bin'.format(self.filename, versionId) if self.filename else '{}.bin'.format(versionId)}", "w+")
+ fname.write(f"{self.filename}-{versionId}.bin" if self.filename else f"{versionId}.bin")
+ fname.close()
+
+ print("Downloading new version, versionId: {}.".format(versionId))
+ self.download_file(url, versionId)
+ else:
+ if message:
+ print("No new version available")
+ else:
+ print("Try again later, server busy")
+ except:
+ print("An error occurred while trying to check for the latest version")
+
+ def initial_request(self):
+ if self.access_key is None or self.secret_key is None:
+ print('No api access key is available.')
+ sys.exit(1)
+
+ signature = self.sign_request()
+ authorization_header = self.build_authorization_header(signature)
+
+ headers = {
+ 'content-type': 'application/json',
+ 'host': self.host,
+ 'x-cortex-date': self.date,
+ 'authorization': authorization_header
+ }
+
+ response = requests.post(self.url, data=self.payload, headers=headers)
+ output = response.text
+ return output
+
+ def download(self):
+ initial_request = self.initial_request()
+ if self.updatermode == "check":
+ self.check_update(initial_request)
+ elif self.updatermode == "download":
+ self.updater_download(initial_request)
+
+
+if __name__ == "__main__":
+ print("CorOS Downloader started")
+ downloader = CorOS_Downloader("2.0.1", updater_mode, download_dir)
+ downloader.download() \ No newline at end of file
diff --git a/Firmware-download/requirements.txt b/Firmware-download/requirements.txt
new file mode 100644
index 0000000..fa9cf06
--- /dev/null
+++ b/Firmware-download/requirements.txt
@@ -0,0 +1 @@
+tqdm \ No newline at end of file