From 22e68c7a2de29bea052b15290d28158b956d3c33 Mon Sep 17 00:00:00 2001 From: Bernhard Radermacher Date: Wed, 8 Oct 2025 11:31:25 +0000 Subject: [PATCH] initial --- cipher.plantuml | 90 ++++++++ pyproject.toml | 18 +- src/kion_vault/__init__.py | 1 + src/kion_vault/_id_name_lookup.py | 18 ++ src/kion_vault/attachment.py | 22 ++ src/kion_vault/cipher.py | 329 ++++++++++++++++++++++++++++++ src/kion_vault/cipher_string.py | 109 ++++++++++ src/kion_vault/exceptions.py | 24 +++ src/kion_vault/key.py | 27 +++ src/kion_vault/organization.py | 16 ++ src/kion_vault/vault.py | 120 +++++++++++ 11 files changed, 773 insertions(+), 1 deletion(-) create mode 100644 cipher.plantuml create mode 100644 src/kion_vault/_id_name_lookup.py create mode 100644 src/kion_vault/attachment.py create mode 100644 src/kion_vault/cipher.py create mode 100644 src/kion_vault/cipher_string.py create mode 100644 src/kion_vault/exceptions.py create mode 100644 src/kion_vault/key.py create mode 100644 src/kion_vault/organization.py create mode 100644 src/kion_vault/vault.py diff --git a/cipher.plantuml b/cipher.plantuml new file mode 100644 index 0000000..7da0d38 --- /dev/null +++ b/cipher.plantuml @@ -0,0 +1,90 @@ +@startuml + +hide empty members +skinparam groupInheritance 2 +skinparam linetype polyline + + + +abstract IdNameLookup { + + get(id: str = None, name: str = None) : **IdNameLookup** {static} +} + +together { + class Organization { + - id + - key + + name + } + + abstract Cipher { + - id + - key + + name + + organization + + attachments + + fields + + notes + } +} + +class Login { + + username + + password + + uri + + uris +} + +class SecureNote + +class Card { + + cardholder_name + + number + + brand + + expiration_month + + expiration_year + + code +} + + +class Identity { + + title + + first_name + + middle_name + + last_name + + username + + company + + ssn + + passport_number + + license_number + + email + + phone + + address1 + + address2 + + address3 + + city + + state + + postal_code + + country +} + + + + +class SshKey { + + private_key + + public_key + + key_fingerprint +} + + +IdNameLookup <|-- Cipher +IdNameLookup <|-- Organization +Cipher <|-- Card +Cipher <|-- Identity +Cipher <|-- Login +Cipher <|-- SecureNote +Cipher <|-- SshKey +Organization "1" -[#blue]r- "0..n" Cipher : \t\t\t + +@enduml diff --git a/pyproject.toml b/pyproject.toml index ad70121..3b42321 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -4,7 +4,23 @@ version = "0.1.0" description = "Add your description here" readme = "README.md" requires-python = ">=3.13" -dependencies = [] +dependencies = [ + "hkdf>=0.0.3", + "pycryptodome>=3.23.0", + "pyjwt[crypto]>=2.10.1", + "requests>=2.32.5", +] + +[dependency-groups] +dev = [ + "black>=25.9.0", +] + +[tool.black] +line-length = 130 +target-version = ['py313'] +include = '\.pyi?$' + [build-system] requires = ["uv_build>=0.8.14,<0.9.0"] diff --git a/src/kion_vault/__init__.py b/src/kion_vault/__init__.py index e69de29..8009761 100644 --- a/src/kion_vault/__init__.py +++ b/src/kion_vault/__init__.py @@ -0,0 +1 @@ +from .vault import Vault \ No newline at end of file diff --git a/src/kion_vault/_id_name_lookup.py b/src/kion_vault/_id_name_lookup.py new file mode 100644 index 0000000..c92827a --- /dev/null +++ b/src/kion_vault/_id_name_lookup.py @@ -0,0 +1,18 @@ +from kion_vault.exceptions import DuplicateNameError + + +# noinspection PyUnresolvedReferences +class _IdNameLookup: + def __init__(self, id, name): + self._lookup_id[id] = self + if name in self._lookup_name: + print(f"Warning: Duplicate Name {name!r} for {self.__class__.__name__!r}") + self._lookup_name[name] = self + + @classmethod + def get(cls, name: str = None, id: str = None, ): + if id is None and name is None: + raise ValueError("either id or name must be specified") + if id is not None: + return cls._lookup_id[id] + return cls._lookup_name[name] diff --git a/src/kion_vault/attachment.py b/src/kion_vault/attachment.py new file mode 100644 index 0000000..46662f3 --- /dev/null +++ b/src/kion_vault/attachment.py @@ -0,0 +1,22 @@ +import os + +import requests + +from kion_vault.cipher_string import CipherString, decrypt_4 +from kion_vault.key import Key + + +class Attachment: + + def __init__(self, key: Key, json: dict) -> None: + self.name = CipherString.parse(json['fileName']).decrypt(key).decode() + self.key = CipherString.parse(json['key']).decrypt(key).decode() + self.url = json['url'] + + def download(self, name: str = None): + data = requests.get(self.url, verify=os.getenv('CA_PATH', False)).content + if not data: + raise RuntimeError + plaintext = decrypt_4(self.key, data) + with open(name or self.name, 'wb') as f: + f.write(plaintext) diff --git a/src/kion_vault/cipher.py b/src/kion_vault/cipher.py new file mode 100644 index 0000000..479f142 --- /dev/null +++ b/src/kion_vault/cipher.py @@ -0,0 +1,329 @@ +import types + +from ._id_name_lookup import _IdNameLookup +from .attachment import Attachment +from .cipher_string import CipherString +from .key import Key +from .organization import Organization + + +class Cipher(_IdNameLookup): + _lookup_id = {} + _lookup_name = {} + + id: str + name: str + organization: Organization = None + _attachments: dict = None + _fields: dict = None + _notes: str = None + + def __new__(cls, key: Key, json: dict): + if cls is not Cipher: + return super().__new__(cls) + return super().__new__(CIPHER_MAP[json["type"]]) + + def __init__(self, key: Key, json: dict) -> None: + self.id = json["id"] + if json['organizationId']: + self.organization = Organization.get(id=json['organizationId']) + key = self.organization.key + self.key = key + self.name = CipherString.parse(json['name']).decrypt(key).decode() + + self._attachments_raw: list[dict] = json.get("attachments", []) + self._fields_raw: list[dict] = json.get("fields", []) + self._notes_raw: str | None = json.get("notes", None) + + super().__init__(self.id, self.name) + + @property + def attachments(self) -> dict: + if self._attachments is None: + self._attachments = {} + for i in self._attachments_raw: + attachment = Attachment(self.key, i) + self._attachments[attachment.name] = attachment + return self._attachments + + @property + def fields(self) -> dict: + if self._fields is None: + self._fields = {} + for i in self._fields_raw: + value = CipherString.parse(i['value']).decrypt(self.key).decode() if i['value'] else None + if i['type'] == 2: + value = value == 'true' + self._fields[CipherString.parse(i['name']).decrypt(self.key).decode()] = value + return self._fields + + @property + def notes(self) -> str: + if self._notes is None: + self._notes = CipherString.parse(self._notes_raw).decrypt(self.key).decode() if self._notes_raw else "" + return self._notes + + def __str__(self): + return self.name + + +class Login(Cipher): + _username: str = None + _password: str = None + _uri: str = None + _uris: list[str] = None + + def __init__(self, key: Key, json: dict) -> None: + super().__init__(key, json) + self._raw = json['login'] + + @property + def username(self) -> str: + if self._username is None: + self._username = CipherString.parse(self._raw['username']).decrypt(self.key).decode() if self._raw['username'] else "" + return self._username + + @property + def password(self) -> str: + if self._password is None: + self._password = CipherString.parse(self._raw['password']).decrypt(self.key).decode() if self._raw['password'] else "" + return self._password + + @property + def uri(self) -> str: + if self._uri is None: + self._uri = CipherString.parse(self._raw['uri']).decrypt(self.key).decode() if self._raw['uri'] else "" + return self._uri + + @property + def uris(self) -> list[str]: + if self._uris is None: + self._uris = [CipherString.parse(i['uri']).decrypt(self.key).decode() for i in self._raw['uris']] + return self._uris + + +class SecureNote(Cipher): + pass + + +class Card(Cipher): + _brand: str = None + _cardholder_name: str = None + _code: str = None + _expiration_month: str = None + _expiration_year: str = None + _number: str = None + + def __init__(self, key: Key, json: dict) -> None: + super().__init__(key, json) + self._raw = json['card'] + + @property + def brand(self) -> str: + if self._brand is None: + self._brand = CipherString.parse(self._raw['brand']).decrypt(self.key).decode() if self._raw['brand'] else "" + return self._brand + + @property + def cardholder_name(self) -> str: + if self._cardholder_name is None: + self._cardholder_name = CipherString.parse(self._raw['cardholderName']).decrypt(self.key).decode() if self._raw['cardholderName'] else "" + return self._cardholder_name + + @property + def code(self) -> str: + if self._code is None: + self._code = CipherString.parse(self._raw['code']).decrypt(self.key).decode() if self._raw['code'] else "" + return self._code + + @property + def expiration_month(self) -> str: + if self._expiration_month is None: + self._expiration_month = CipherString.parse(self._raw['expMonth']).decrypt(self.key).decode() if self._raw['expMonth'] else "" + return self._expiration_month + + @property + def expiration_year(self) -> str: + if self._expiration_year is None: + self._expiration_year = CipherString.parse(self._raw['expYear']).decrypt(self.key).decode() if self._raw['expYear'] else "" + return self._expiration_year + + @property + def number(self) -> str: + if self._number is None: + self._number = CipherString.parse(self._raw['number']).decrypt(self.key).decode() if self._raw['number'] else "" + return self._number + + +class Identity(Cipher): + _address1: str = None + _address2: str = None + _address3: str = None + _city: str = None + _company: str = None + _country: str = None + _email: str = None + _first_name: str = None + _last_name: str = None + _license_number: str = None + _middle_name: str = None + _passport_number: str = None + _phone: str = None + _postal_code: str = None + _ssn: str = None + _state: str = None + _title: str = None + _username: str = None + + def __init__(self, key: Key, json: dict) -> None: + super().__init__(key, json) + self._raw = json['identity'] + + @property + def address1(self) -> str: + if self._address1 is None: + self._address1 = CipherString.parse(self._raw['address1']).decrypt(self.key).decode() if self._raw['address1'] else "" + return self._address1 + + @property + def address2(self) -> str: + if self._address2 is None: + self._address2 = CipherString.parse(self._raw['address2']).decrypt(self.key).decode() if self._raw['address2'] else "" + return self._address2 + + @property + def address3(self) -> str: + if self._address3 is None: + self._address3 = CipherString.parse(self._raw['address3']).decrypt(self.key).decode() if self._raw['address3'] else "" + return self._address3 + + @property + def city(self) -> str: + if self._city is None: + self._city = CipherString.parse(self._raw['city']).decrypt(self.key).decode() if self._raw['city'] else "" + return self._city + + @property + def company(self) -> str: + if self._company is None: + self._company = CipherString.parse(self._raw['company']).decrypt(self.key).decode() if self._raw['company'] else "" + return self._company + + @property + def country(self) -> str: + if self._country is None: + self._country = CipherString.parse(self._raw['country']).decrypt(self.key).decode() if self._raw['country'] else "" + return self._country + + @property + def email(self) -> str: + if self._email is None: + self._email = CipherString.parse(self._raw['email']).decrypt(self.key).decode() if self._raw['email'] else "" + return self._email + + @property + def first_name(self) -> str: + if self._first_name is None: + self._first_name = CipherString.parse(self._raw['firstName']).decrypt(self.key).decode() if self._raw['firstName'] else "" + return self._first_name + + @property + def last_name(self) -> str: + if self._last_name is None: + self._last_name = CipherString.parse(self._raw['lastName']).decrypt(self.key).decode() if self._raw['lastName'] else "" + return self._last_name + + @property + def license_number(self) -> str: + if self._license_number is None: + self._license_number = CipherString.parse(self._raw['licenseNumber']).decrypt(self.key).decode() if self._raw['licenseNumber'] else "" + return self._license_number + + @property + def middle_name(self) -> str: + if self._middle_name is None: + self._middle_name = CipherString.parse(self._raw['middleName']).decrypt(self.key).decode() if self._raw['middleName'] else "" + return self._middle_name + + @property + def passport_number(self) -> str: + if self._passport_number is None: + self._passport_number = CipherString.parse(self._raw['passportNumber']).decrypt(self.key).decode() if self._raw['passportNumber'] else "" + return self._passport_number + + @property + def phone(self) -> str: + if self._phone is None: + self._phone = CipherString.parse(self._raw['phone']).decrypt(self.key).decode() if self._raw['phone'] else "" + return self._phone + + @property + def postal_code(self) -> str: + if self._postal_code is None: + self._postal_code = CipherString.parse(self._raw['postalCode']).decrypt(self.key).decode() if self._raw['postalCode'] else "" + return self._postal_code + + @property + def ssn(self) -> str: + if self._ssn is None: + self._ssn = CipherString.parse(self._raw['ssn']).decrypt(self.key).decode() if self._raw['ssn'] else "" + return self._ssn + + @property + def state(self) -> str: + if self._state is None: + self._state = CipherString.parse(self._raw['state']).decrypt(self.key).decode() if self._raw['state'] else "" + return self._state + + @property + def title(self) -> str: + if self._title is None: + self._title = CipherString.parse(self._raw['title']).decrypt(self.key).decode() if self._raw['title'] else "" + return self._title + + @property + def username(self) -> str: + if self._username is None: + self._username = CipherString.parse(self._raw['username']).decrypt(self.key).decode() if self._raw['username'] else "" + return self._username + + + +class SshKey(Cipher): + _private_key: str = None + _public_key: str = None + _key_fingerprint: str = None + + def __init__(self, key: Key, json: dict) -> None: + super().__init__(key, json) + self._raw = json['sshKey'] + + + @property + def private_key(self) -> str: + if self._private_key is None: + self._private_key = CipherString.parse(self._raw['privateKey']).decrypt(self.key).decode() + return self._private_key + + @property + def public_key(self) -> str: + if self._public_key is None: + self._public_key = CipherString.parse(self._raw['publicKey']).decrypt(self.key).decode() + return self._public_key + + @property + def key_fingerprint(self) -> str: + if self._key_fingerprint is None: + self._key_fingerprint = CipherString.parse(self._raw['keyFingerprint']).decrypt(self.key).decode() + return self._key_fingerprint + + +CIPHER_MAP = types.MappingProxyType( + { + 1: Login, + 2: SecureNote, + 3: Card, + 4: Identity, + 5: SshKey, + } +) diff --git a/src/kion_vault/cipher_string.py b/src/kion_vault/cipher_string.py new file mode 100644 index 0000000..6bb19a6 --- /dev/null +++ b/src/kion_vault/cipher_string.py @@ -0,0 +1,109 @@ +import base64 +import binascii +import hashlib +import hmac +import re +import types + +from Crypto.Cipher import AES, PKCS1_OAEP +from Crypto.PublicKey import RSA + +from .exceptions import Base64DecodeError, EncryptionTypeNotImplementedError, InvalidCipherStringError +from .key import Key + +ENCRYPTION_TYPES = types.MappingProxyType( + { + "0": "AES-256 CBC", + "1": "AES-128 CBC - HMAC SHA-256", + "2": "AES-256 CBC - HMAC SHA-256", + "3": "RSA-2048 - OAEP SHA-256", + "4": "RSA-2048 - OAEP SHA-1", + "5": "RSA-2048 - OAEP SHA-256 - HMAC SHA-256", + "6": "RSA-2048 - OAEP SHA-1 - HMAC SHA-256", + } +) + + +def decode_b64(value: str, name: str) -> bytes: + try: + return base64.b64decode(value) + except binascii.Error: + raise Base64DecodeError(name, value) + + +def decrypt_4(key: RSA.RsaKey, cipher_text) -> bytes: + return PKCS1_OAEP.new(key).decrypt(cipher_text) + + + +class CipherString: + + cipher_text: bytes + initialization_vector: bytes = None + message_authentication_code: bytes = None + + def __init__( + self, + cipher_text: bytes, + initialization_vector: bytes = None, + message_authentication_code: bytes = None, + ): + self.cipher_text = cipher_text + self.initialization_vector = initialization_vector + self.message_authentication_code = message_authentication_code + + @classmethod + def parse(cls, value: str): + if value[0] == "2": + return CipherStringType2.parse(value) + if value[0] == "4": + return CipherStringType4.parse(value) + raise EncryptionTypeNotImplementedError(ENCRYPTION_TYPES[value[0]]) + + +class CipherStringType2(CipherString): + """Cipher String AES-256 CBC - HMAC SHA-256""" + + RE = re.compile(r"^2\.([^|]+)\|([^|]+)\|(.+)") + + @classmethod + def parse(cls, value: str): + m = cls.RE.match(value) + if m is None: + raise InvalidCipherStringError(value) + kwargs = {k: decode_b64(m.group(i + 1), k) for i, k in enumerate(( + "initialization_vector", + "cipher_text", + "message_authentication_code"))} + return cls(**kwargs) + + def verify(self, key: Key) -> bool: + return hmac.new(key.message_authentication_key, + self.initialization_vector + self.cipher_text, + hashlib.sha256).digest() == self.message_authentication_code + + def decrypt(self, key: Key) -> bytes: + plaintext = AES.new(key.encryption_key, + AES.MODE_CBC, + iv=self.initialization_vector + ).decrypt(self.cipher_text) + pad_len = plaintext[-1] + padding = bytes([pad_len] * pad_len) + if plaintext[-len(padding):] == padding: + plaintext = plaintext[:-pad_len] + return plaintext + +class CipherStringType4(CipherString): + """Cipher String RSA-2048 - OAEP SHA-1""" + + RE = re.compile(r"^4\.(.+)") + + @classmethod + def parse(cls, value: str): + m = cls.RE.match(value) + if m is None: + raise InvalidCipherStringError(value) + return cls(cipher_text=decode_b64(m.group(1), "cipher_text")) + + def decrypt(self, key: RSA.RsaKey) -> bytes: + return decrypt_4(key, self.cipher_text) diff --git a/src/kion_vault/exceptions.py b/src/kion_vault/exceptions.py new file mode 100644 index 0000000..09e45f8 --- /dev/null +++ b/src/kion_vault/exceptions.py @@ -0,0 +1,24 @@ +class VaultError(Exception): + pass + + +class DuplicateNameError(ValueError, VaultError): + def __init__(self, cls, name): + self.msg = f"Duplicate name {name!r} in class {cls.__name__!r}" + super().__init__(self.msg) + self.cls = cls + self.name = name + + +class InvalidCipherStringError(VaultError, ValueError): pass + + +class Base64DecodeError(VaultError, ValueError): + def __init__(self, name, value): + self.msg = f"Cannot decode {value!r} for {name!r}" + super().__init__(self.msg) + self.name = name + self.value = value + + +class EncryptionTypeNotImplementedError(VaultError, ValueError): pass diff --git a/src/kion_vault/key.py b/src/kion_vault/key.py new file mode 100644 index 0000000..e2c9af7 --- /dev/null +++ b/src/kion_vault/key.py @@ -0,0 +1,27 @@ +import hashlib + +import hkdf + + +class Key: + + def __init__(self, key: bytes) -> None: + self.key = key + + @property + def encryption_key(self) -> bytes: + return self.key[:32] + + @property + def message_authentication_key(self) -> bytes: + return self.key[32:] + + @classmethod + def from_logon(cls, email: str, password: str, iterations: int) -> "Key": + source_key = hashlib.pbkdf2_hmac( + 'sha256', + password.encode(), + email.encode(), + iterations) + return cls(hkdf.hkdf_expand(source_key, b'enc', 32, hashlib.sha256) + + hkdf.hkdf_expand(source_key, b'mac', 32, hashlib.sha256)) diff --git a/src/kion_vault/organization.py b/src/kion_vault/organization.py new file mode 100644 index 0000000..a3f2df1 --- /dev/null +++ b/src/kion_vault/organization.py @@ -0,0 +1,16 @@ +from Crypto.PublicKey.RSA import RsaKey + +from ._id_name_lookup import _IdNameLookup +from .cipher_string import CipherString +from .key import Key + + +class Organization(_IdNameLookup): + _lookup_id = {} + _lookup_name = {} + + def __init__(self, key: RsaKey, json: dict) -> None: + self.id = json['id'] + self.name = json['name'] + self.key = Key(CipherString.parse(json['key']).decrypt(key)) + super().__init__(self.id, self.name) diff --git a/src/kion_vault/vault.py b/src/kion_vault/vault.py new file mode 100644 index 0000000..a593c31 --- /dev/null +++ b/src/kion_vault/vault.py @@ -0,0 +1,120 @@ +import datetime +import json +import os +import platform +import time + +import requests +from Crypto.PublicKey import RSA + +from .cipher import Cipher +from .cipher_string import CipherString +from .key import Key +from .organization import Organization + + +def dump_response(name, data): + if os.getenv('WARDEN_DUMP', False): + with open(f"{datetime.datetime.now(datetime.UTC).strftime('%Y-%m-%d-%H-%M-%S-%f')}_{name}.json", "w") as f: + json.dump(data, f, indent=2) + + +class Vault: + _token_timer: float = 0.0 + _token: str + _kdf_iterations: int = None + _intermediate_key: Key = None + key: Key = None + private_key: RSA.RsaKey = None + organizations = Organization + ciphers = Cipher + + def __init__( + self, + host: str = os.getenv("WARDEN_HOST"), + email: str = os.getenv("WARDEN_EMAIL"), + password: str = os.getenv("WARDEN_PASSWORD"), + client_id: str = os.getenv("WARDEN_CLIENT_ID"), + client_secret: str = os.getenv("WARDEN_CLIENT_SECRET"), + scope: str = 'api', + grant_type: str = 'client_credentials', + device_identifier: str = None, + device_name: str = 'automation_api', + device_type: int = 21) -> None: + if device_identifier is None: + device_identifier = platform.node() + self.email = email + self.password = password + self.client_id = client_id + self.client_secret = client_secret + self.host = host.rstrip('/') + self.scope = scope + self.grant_type = grant_type + self.device_identifier = device_identifier + self.device_name = device_name + self.device_type = device_type + + response = requests.get(f'{self.host}/api/sync', + headers = { + "Accept": "application/json; charset=utf-8", + "Bitwarden-Client-Version": '2025.1.1', + "content-type": "application/json; charset=utf-8", + 'Authorization': f"Bearer {self.token}", + }, + verify=os.getenv('CA_PATH', False)) + response.raise_for_status() + dump_response("api_sync", response.json()) + + [Organization(self.private_key, i) for i in response.json()['profile'].get('organizations', [])] + [Cipher(self.key, i) for i in response.json().get('ciphers', [])] + + + @property + def kdf_iterations(self) -> int: + if self._kdf_iterations is None: + response = requests.post(f'{self.host}/api/accounts/prelogin', + json=dict(email=self.email), + verify=os.getenv('CA_PATH', False)) + response.raise_for_status() + dump_response("api_accounts_prelogin", response.json()) + + self._kdf_iterations = response.json()['kdfIterations'] + return self._kdf_iterations + + @property + def intermediate_key(self) -> Key: + if self._intermediate_key is None: + self._intermediate_key = Key.from_logon( + email=self.email, + password=self.password, + iterations=self.kdf_iterations) + return self._intermediate_key + + @property + def token(self) -> str: + if self._token_timer < time.time(): + response = requests.post(f'{self.host}/identity/connect/token', + data=dict( + client_id=self.client_id, + client_secret=self.client_secret, + device_identifier=self.device_identifier, + device_name=self.device_name, + device_type=self.device_type, + grant_type=self.grant_type, + scope=self.scope, + ), + verify=os.getenv('CA_PATH', False)) + response.raise_for_status() + dump_response("identity_connect_token", response.json()) + + self._token_timer = time.time() + response.json()['expires_in'] - 5 + self._token = response.json()['access_token'] + key = CipherString.parse(response.json()['Key']) + if not key.verify(self.intermediate_key): + raise ValueError('key') + self.key = Key(key.decrypt(self.intermediate_key)) + private_key = CipherString.parse(response.json()['PrivateKey']) + if not private_key.verify(self.key): + raise ValueError('private_key') + self.private_key = RSA.importKey(private_key.decrypt(self.key)) + return self._token