This commit is contained in:
Bernhard Radermacher
2025-10-08 11:31:25 +00:00
parent 858beb0c15
commit 22e68c7a2d
11 changed files with 773 additions and 1 deletions

90
cipher.plantuml Normal file
View File

@@ -0,0 +1,90 @@
@startuml
hide empty members
skinparam groupInheritance 2
skinparam linetype polyline
abstract IdNameLookup {
+ get(id: str = None, name: str = None) : **IdNameLookup** {static}
}
together {
class Organization {
- id
- key
+ name
}
abstract Cipher {
- id
- key
+ name
+ organization
+ attachments
+ fields
+ notes
}
}
class Login {
+ username
+ password
+ uri
+ uris
}
class SecureNote
class Card {
+ cardholder_name
+ number
+ brand
+ expiration_month
+ expiration_year
+ code
}
class Identity {
+ title
+ first_name
+ middle_name
+ last_name
+ username
+ company
+ ssn
+ passport_number
+ license_number
+ email
+ phone
+ address1
+ address2
+ address3
+ city
+ state
+ postal_code
+ country
}
class SshKey {
+ private_key
+ public_key
+ key_fingerprint
}
IdNameLookup <|-- Cipher
IdNameLookup <|-- Organization
Cipher <|-- Card
Cipher <|-- Identity
Cipher <|-- Login
Cipher <|-- SecureNote
Cipher <|-- SshKey
Organization "1" -[#blue]r- "0..n" Cipher : \t\t\t
@enduml

View File

@@ -4,7 +4,23 @@ version = "0.1.0"
description = "Add your description here" description = "Add your description here"
readme = "README.md" readme = "README.md"
requires-python = ">=3.13" requires-python = ">=3.13"
dependencies = [] dependencies = [
"hkdf>=0.0.3",
"pycryptodome>=3.23.0",
"pyjwt[crypto]>=2.10.1",
"requests>=2.32.5",
]
[dependency-groups]
dev = [
"black>=25.9.0",
]
[tool.black]
line-length = 130
target-version = ['py313']
include = '\.pyi?$'
[build-system] [build-system]
requires = ["uv_build>=0.8.14,<0.9.0"] requires = ["uv_build>=0.8.14,<0.9.0"]

View File

@@ -0,0 +1 @@
from .vault import Vault

View File

@@ -0,0 +1,18 @@
from kion_vault.exceptions import DuplicateNameError
# noinspection PyUnresolvedReferences
class _IdNameLookup:
def __init__(self, id, name):
self._lookup_id[id] = self
if name in self._lookup_name:
print(f"Warning: Duplicate Name {name!r} for {self.__class__.__name__!r}")
self._lookup_name[name] = self
@classmethod
def get(cls, name: str = None, id: str = None, ):
if id is None and name is None:
raise ValueError("either id or name must be specified")
if id is not None:
return cls._lookup_id[id]
return cls._lookup_name[name]

View File

@@ -0,0 +1,22 @@
import os
import requests
from kion_vault.cipher_string import CipherString, decrypt_4
from kion_vault.key import Key
class Attachment:
def __init__(self, key: Key, json: dict) -> None:
self.name = CipherString.parse(json['fileName']).decrypt(key).decode()
self.key = CipherString.parse(json['key']).decrypt(key).decode()
self.url = json['url']
def download(self, name: str = None):
data = requests.get(self.url, verify=os.getenv('CA_PATH', False)).content
if not data:
raise RuntimeError
plaintext = decrypt_4(self.key, data)
with open(name or self.name, 'wb') as f:
f.write(plaintext)

329
src/kion_vault/cipher.py Normal file
View File

@@ -0,0 +1,329 @@
import types
from ._id_name_lookup import _IdNameLookup
from .attachment import Attachment
from .cipher_string import CipherString
from .key import Key
from .organization import Organization
class Cipher(_IdNameLookup):
_lookup_id = {}
_lookup_name = {}
id: str
name: str
organization: Organization = None
_attachments: dict = None
_fields: dict = None
_notes: str = None
def __new__(cls, key: Key, json: dict):
if cls is not Cipher:
return super().__new__(cls)
return super().__new__(CIPHER_MAP[json["type"]])
def __init__(self, key: Key, json: dict) -> None:
self.id = json["id"]
if json['organizationId']:
self.organization = Organization.get(id=json['organizationId'])
key = self.organization.key
self.key = key
self.name = CipherString.parse(json['name']).decrypt(key).decode()
self._attachments_raw: list[dict] = json.get("attachments", [])
self._fields_raw: list[dict] = json.get("fields", [])
self._notes_raw: str | None = json.get("notes", None)
super().__init__(self.id, self.name)
@property
def attachments(self) -> dict:
if self._attachments is None:
self._attachments = {}
for i in self._attachments_raw:
attachment = Attachment(self.key, i)
self._attachments[attachment.name] = attachment
return self._attachments
@property
def fields(self) -> dict:
if self._fields is None:
self._fields = {}
for i in self._fields_raw:
value = CipherString.parse(i['value']).decrypt(self.key).decode() if i['value'] else None
if i['type'] == 2:
value = value == 'true'
self._fields[CipherString.parse(i['name']).decrypt(self.key).decode()] = value
return self._fields
@property
def notes(self) -> str:
if self._notes is None:
self._notes = CipherString.parse(self._notes_raw).decrypt(self.key).decode() if self._notes_raw else ""
return self._notes
def __str__(self):
return self.name
class Login(Cipher):
_username: str = None
_password: str = None
_uri: str = None
_uris: list[str] = None
def __init__(self, key: Key, json: dict) -> None:
super().__init__(key, json)
self._raw = json['login']
@property
def username(self) -> str:
if self._username is None:
self._username = CipherString.parse(self._raw['username']).decrypt(self.key).decode() if self._raw['username'] else ""
return self._username
@property
def password(self) -> str:
if self._password is None:
self._password = CipherString.parse(self._raw['password']).decrypt(self.key).decode() if self._raw['password'] else ""
return self._password
@property
def uri(self) -> str:
if self._uri is None:
self._uri = CipherString.parse(self._raw['uri']).decrypt(self.key).decode() if self._raw['uri'] else ""
return self._uri
@property
def uris(self) -> list[str]:
if self._uris is None:
self._uris = [CipherString.parse(i['uri']).decrypt(self.key).decode() for i in self._raw['uris']]
return self._uris
class SecureNote(Cipher):
pass
class Card(Cipher):
_brand: str = None
_cardholder_name: str = None
_code: str = None
_expiration_month: str = None
_expiration_year: str = None
_number: str = None
def __init__(self, key: Key, json: dict) -> None:
super().__init__(key, json)
self._raw = json['card']
@property
def brand(self) -> str:
if self._brand is None:
self._brand = CipherString.parse(self._raw['brand']).decrypt(self.key).decode() if self._raw['brand'] else ""
return self._brand
@property
def cardholder_name(self) -> str:
if self._cardholder_name is None:
self._cardholder_name = CipherString.parse(self._raw['cardholderName']).decrypt(self.key).decode() if self._raw['cardholderName'] else ""
return self._cardholder_name
@property
def code(self) -> str:
if self._code is None:
self._code = CipherString.parse(self._raw['code']).decrypt(self.key).decode() if self._raw['code'] else ""
return self._code
@property
def expiration_month(self) -> str:
if self._expiration_month is None:
self._expiration_month = CipherString.parse(self._raw['expMonth']).decrypt(self.key).decode() if self._raw['expMonth'] else ""
return self._expiration_month
@property
def expiration_year(self) -> str:
if self._expiration_year is None:
self._expiration_year = CipherString.parse(self._raw['expYear']).decrypt(self.key).decode() if self._raw['expYear'] else ""
return self._expiration_year
@property
def number(self) -> str:
if self._number is None:
self._number = CipherString.parse(self._raw['number']).decrypt(self.key).decode() if self._raw['number'] else ""
return self._number
class Identity(Cipher):
_address1: str = None
_address2: str = None
_address3: str = None
_city: str = None
_company: str = None
_country: str = None
_email: str = None
_first_name: str = None
_last_name: str = None
_license_number: str = None
_middle_name: str = None
_passport_number: str = None
_phone: str = None
_postal_code: str = None
_ssn: str = None
_state: str = None
_title: str = None
_username: str = None
def __init__(self, key: Key, json: dict) -> None:
super().__init__(key, json)
self._raw = json['identity']
@property
def address1(self) -> str:
if self._address1 is None:
self._address1 = CipherString.parse(self._raw['address1']).decrypt(self.key).decode() if self._raw['address1'] else ""
return self._address1
@property
def address2(self) -> str:
if self._address2 is None:
self._address2 = CipherString.parse(self._raw['address2']).decrypt(self.key).decode() if self._raw['address2'] else ""
return self._address2
@property
def address3(self) -> str:
if self._address3 is None:
self._address3 = CipherString.parse(self._raw['address3']).decrypt(self.key).decode() if self._raw['address3'] else ""
return self._address3
@property
def city(self) -> str:
if self._city is None:
self._city = CipherString.parse(self._raw['city']).decrypt(self.key).decode() if self._raw['city'] else ""
return self._city
@property
def company(self) -> str:
if self._company is None:
self._company = CipherString.parse(self._raw['company']).decrypt(self.key).decode() if self._raw['company'] else ""
return self._company
@property
def country(self) -> str:
if self._country is None:
self._country = CipherString.parse(self._raw['country']).decrypt(self.key).decode() if self._raw['country'] else ""
return self._country
@property
def email(self) -> str:
if self._email is None:
self._email = CipherString.parse(self._raw['email']).decrypt(self.key).decode() if self._raw['email'] else ""
return self._email
@property
def first_name(self) -> str:
if self._first_name is None:
self._first_name = CipherString.parse(self._raw['firstName']).decrypt(self.key).decode() if self._raw['firstName'] else ""
return self._first_name
@property
def last_name(self) -> str:
if self._last_name is None:
self._last_name = CipherString.parse(self._raw['lastName']).decrypt(self.key).decode() if self._raw['lastName'] else ""
return self._last_name
@property
def license_number(self) -> str:
if self._license_number is None:
self._license_number = CipherString.parse(self._raw['licenseNumber']).decrypt(self.key).decode() if self._raw['licenseNumber'] else ""
return self._license_number
@property
def middle_name(self) -> str:
if self._middle_name is None:
self._middle_name = CipherString.parse(self._raw['middleName']).decrypt(self.key).decode() if self._raw['middleName'] else ""
return self._middle_name
@property
def passport_number(self) -> str:
if self._passport_number is None:
self._passport_number = CipherString.parse(self._raw['passportNumber']).decrypt(self.key).decode() if self._raw['passportNumber'] else ""
return self._passport_number
@property
def phone(self) -> str:
if self._phone is None:
self._phone = CipherString.parse(self._raw['phone']).decrypt(self.key).decode() if self._raw['phone'] else ""
return self._phone
@property
def postal_code(self) -> str:
if self._postal_code is None:
self._postal_code = CipherString.parse(self._raw['postalCode']).decrypt(self.key).decode() if self._raw['postalCode'] else ""
return self._postal_code
@property
def ssn(self) -> str:
if self._ssn is None:
self._ssn = CipherString.parse(self._raw['ssn']).decrypt(self.key).decode() if self._raw['ssn'] else ""
return self._ssn
@property
def state(self) -> str:
if self._state is None:
self._state = CipherString.parse(self._raw['state']).decrypt(self.key).decode() if self._raw['state'] else ""
return self._state
@property
def title(self) -> str:
if self._title is None:
self._title = CipherString.parse(self._raw['title']).decrypt(self.key).decode() if self._raw['title'] else ""
return self._title
@property
def username(self) -> str:
if self._username is None:
self._username = CipherString.parse(self._raw['username']).decrypt(self.key).decode() if self._raw['username'] else ""
return self._username
class SshKey(Cipher):
_private_key: str = None
_public_key: str = None
_key_fingerprint: str = None
def __init__(self, key: Key, json: dict) -> None:
super().__init__(key, json)
self._raw = json['sshKey']
@property
def private_key(self) -> str:
if self._private_key is None:
self._private_key = CipherString.parse(self._raw['privateKey']).decrypt(self.key).decode()
return self._private_key
@property
def public_key(self) -> str:
if self._public_key is None:
self._public_key = CipherString.parse(self._raw['publicKey']).decrypt(self.key).decode()
return self._public_key
@property
def key_fingerprint(self) -> str:
if self._key_fingerprint is None:
self._key_fingerprint = CipherString.parse(self._raw['keyFingerprint']).decrypt(self.key).decode()
return self._key_fingerprint
CIPHER_MAP = types.MappingProxyType(
{
1: Login,
2: SecureNote,
3: Card,
4: Identity,
5: SshKey,
}
)

View File

@@ -0,0 +1,109 @@
import base64
import binascii
import hashlib
import hmac
import re
import types
from Crypto.Cipher import AES, PKCS1_OAEP
from Crypto.PublicKey import RSA
from .exceptions import Base64DecodeError, EncryptionTypeNotImplementedError, InvalidCipherStringError
from .key import Key
ENCRYPTION_TYPES = types.MappingProxyType(
{
"0": "AES-256 CBC",
"1": "AES-128 CBC - HMAC SHA-256",
"2": "AES-256 CBC - HMAC SHA-256",
"3": "RSA-2048 - OAEP SHA-256",
"4": "RSA-2048 - OAEP SHA-1",
"5": "RSA-2048 - OAEP SHA-256 - HMAC SHA-256",
"6": "RSA-2048 - OAEP SHA-1 - HMAC SHA-256",
}
)
def decode_b64(value: str, name: str) -> bytes:
try:
return base64.b64decode(value)
except binascii.Error:
raise Base64DecodeError(name, value)
def decrypt_4(key: RSA.RsaKey, cipher_text) -> bytes:
return PKCS1_OAEP.new(key).decrypt(cipher_text)
class CipherString:
cipher_text: bytes
initialization_vector: bytes = None
message_authentication_code: bytes = None
def __init__(
self,
cipher_text: bytes,
initialization_vector: bytes = None,
message_authentication_code: bytes = None,
):
self.cipher_text = cipher_text
self.initialization_vector = initialization_vector
self.message_authentication_code = message_authentication_code
@classmethod
def parse(cls, value: str):
if value[0] == "2":
return CipherStringType2.parse(value)
if value[0] == "4":
return CipherStringType4.parse(value)
raise EncryptionTypeNotImplementedError(ENCRYPTION_TYPES[value[0]])
class CipherStringType2(CipherString):
"""Cipher String AES-256 CBC - HMAC SHA-256"""
RE = re.compile(r"^2\.([^|]+)\|([^|]+)\|(.+)")
@classmethod
def parse(cls, value: str):
m = cls.RE.match(value)
if m is None:
raise InvalidCipherStringError(value)
kwargs = {k: decode_b64(m.group(i + 1), k) for i, k in enumerate((
"initialization_vector",
"cipher_text",
"message_authentication_code"))}
return cls(**kwargs)
def verify(self, key: Key) -> bool:
return hmac.new(key.message_authentication_key,
self.initialization_vector + self.cipher_text,
hashlib.sha256).digest() == self.message_authentication_code
def decrypt(self, key: Key) -> bytes:
plaintext = AES.new(key.encryption_key,
AES.MODE_CBC,
iv=self.initialization_vector
).decrypt(self.cipher_text)
pad_len = plaintext[-1]
padding = bytes([pad_len] * pad_len)
if plaintext[-len(padding):] == padding:
plaintext = plaintext[:-pad_len]
return plaintext
class CipherStringType4(CipherString):
"""Cipher String RSA-2048 - OAEP SHA-1"""
RE = re.compile(r"^4\.(.+)")
@classmethod
def parse(cls, value: str):
m = cls.RE.match(value)
if m is None:
raise InvalidCipherStringError(value)
return cls(cipher_text=decode_b64(m.group(1), "cipher_text"))
def decrypt(self, key: RSA.RsaKey) -> bytes:
return decrypt_4(key, self.cipher_text)

View File

@@ -0,0 +1,24 @@
class VaultError(Exception):
pass
class DuplicateNameError(ValueError, VaultError):
def __init__(self, cls, name):
self.msg = f"Duplicate name {name!r} in class {cls.__name__!r}"
super().__init__(self.msg)
self.cls = cls
self.name = name
class InvalidCipherStringError(VaultError, ValueError): pass
class Base64DecodeError(VaultError, ValueError):
def __init__(self, name, value):
self.msg = f"Cannot decode {value!r} for {name!r}"
super().__init__(self.msg)
self.name = name
self.value = value
class EncryptionTypeNotImplementedError(VaultError, ValueError): pass

27
src/kion_vault/key.py Normal file
View File

@@ -0,0 +1,27 @@
import hashlib
import hkdf
class Key:
def __init__(self, key: bytes) -> None:
self.key = key
@property
def encryption_key(self) -> bytes:
return self.key[:32]
@property
def message_authentication_key(self) -> bytes:
return self.key[32:]
@classmethod
def from_logon(cls, email: str, password: str, iterations: int) -> "Key":
source_key = hashlib.pbkdf2_hmac(
'sha256',
password.encode(),
email.encode(),
iterations)
return cls(hkdf.hkdf_expand(source_key, b'enc', 32, hashlib.sha256) +
hkdf.hkdf_expand(source_key, b'mac', 32, hashlib.sha256))

View File

@@ -0,0 +1,16 @@
from Crypto.PublicKey.RSA import RsaKey
from ._id_name_lookup import _IdNameLookup
from .cipher_string import CipherString
from .key import Key
class Organization(_IdNameLookup):
_lookup_id = {}
_lookup_name = {}
def __init__(self, key: RsaKey, json: dict) -> None:
self.id = json['id']
self.name = json['name']
self.key = Key(CipherString.parse(json['key']).decrypt(key))
super().__init__(self.id, self.name)

120
src/kion_vault/vault.py Normal file
View File

@@ -0,0 +1,120 @@
import datetime
import json
import os
import platform
import time
import requests
from Crypto.PublicKey import RSA
from .cipher import Cipher
from .cipher_string import CipherString
from .key import Key
from .organization import Organization
def dump_response(name, data):
if os.getenv('WARDEN_DUMP', False):
with open(f"{datetime.datetime.now(datetime.UTC).strftime('%Y-%m-%d-%H-%M-%S-%f')}_{name}.json", "w") as f:
json.dump(data, f, indent=2)
class Vault:
_token_timer: float = 0.0
_token: str
_kdf_iterations: int = None
_intermediate_key: Key = None
key: Key = None
private_key: RSA.RsaKey = None
organizations = Organization
ciphers = Cipher
def __init__(
self,
host: str = os.getenv("WARDEN_HOST"),
email: str = os.getenv("WARDEN_EMAIL"),
password: str = os.getenv("WARDEN_PASSWORD"),
client_id: str = os.getenv("WARDEN_CLIENT_ID"),
client_secret: str = os.getenv("WARDEN_CLIENT_SECRET"),
scope: str = 'api',
grant_type: str = 'client_credentials',
device_identifier: str = None,
device_name: str = 'automation_api',
device_type: int = 21) -> None:
if device_identifier is None:
device_identifier = platform.node()
self.email = email
self.password = password
self.client_id = client_id
self.client_secret = client_secret
self.host = host.rstrip('/')
self.scope = scope
self.grant_type = grant_type
self.device_identifier = device_identifier
self.device_name = device_name
self.device_type = device_type
response = requests.get(f'{self.host}/api/sync',
headers = {
"Accept": "application/json; charset=utf-8",
"Bitwarden-Client-Version": '2025.1.1',
"content-type": "application/json; charset=utf-8",
'Authorization': f"Bearer {self.token}",
},
verify=os.getenv('CA_PATH', False))
response.raise_for_status()
dump_response("api_sync", response.json())
[Organization(self.private_key, i) for i in response.json()['profile'].get('organizations', [])]
[Cipher(self.key, i) for i in response.json().get('ciphers', [])]
@property
def kdf_iterations(self) -> int:
if self._kdf_iterations is None:
response = requests.post(f'{self.host}/api/accounts/prelogin',
json=dict(email=self.email),
verify=os.getenv('CA_PATH', False))
response.raise_for_status()
dump_response("api_accounts_prelogin", response.json())
self._kdf_iterations = response.json()['kdfIterations']
return self._kdf_iterations
@property
def intermediate_key(self) -> Key:
if self._intermediate_key is None:
self._intermediate_key = Key.from_logon(
email=self.email,
password=self.password,
iterations=self.kdf_iterations)
return self._intermediate_key
@property
def token(self) -> str:
if self._token_timer < time.time():
response = requests.post(f'{self.host}/identity/connect/token',
data=dict(
client_id=self.client_id,
client_secret=self.client_secret,
device_identifier=self.device_identifier,
device_name=self.device_name,
device_type=self.device_type,
grant_type=self.grant_type,
scope=self.scope,
),
verify=os.getenv('CA_PATH', False))
response.raise_for_status()
dump_response("identity_connect_token", response.json())
self._token_timer = time.time() + response.json()['expires_in'] - 5
self._token = response.json()['access_token']
key = CipherString.parse(response.json()['Key'])
if not key.verify(self.intermediate_key):
raise ValueError('key')
self.key = Key(key.decrypt(self.intermediate_key))
private_key = CipherString.parse(response.json()['PrivateKey'])
if not private_key.verify(self.key):
raise ValueError('private_key')
self.private_key = RSA.importKey(private_key.decrypt(self.key))
return self._token