You cannot select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
286 lines
8.4 KiB
Python
286 lines
8.4 KiB
Python
#!/usr/bin/env vpython3
|
|
# Copyright 2025 The ChromiumOS Authors
|
|
# Use of this source code is governed by a BSD-style license that can be
|
|
# found in the LICENSE file.
|
|
|
|
# [VPYTHON:BEGIN]
|
|
# python_version: "3.11"
|
|
# wheel: <
|
|
# name: "infra/python/wheels/cffi/${vpython_platform}"
|
|
# version: "version:1.15.1.chromium.2"
|
|
# >
|
|
# wheel: <
|
|
# name: "infra/python/wheels/cryptography/${vpython_platform}"
|
|
# version: "version:43.0.0"
|
|
# >
|
|
# wheel: <
|
|
# name: "infra/python/wheels/pycparser-py2_py3"
|
|
# version: "version:2.21"
|
|
# >
|
|
# wheel: <
|
|
# name: "infra/python/wheels/fido2-py3"
|
|
# version: "version:2.0.0"
|
|
# >
|
|
# [VPYTHON:END]
|
|
|
|
from concurrent.futures import ThreadPoolExecutor, as_completed
|
|
from contextlib import contextmanager
|
|
import dataclasses
|
|
import json
|
|
import logging
|
|
import signal
|
|
import sys
|
|
from threading import Event
|
|
from typing import BinaryIO
|
|
|
|
from fido2.client import DefaultClientDataCollector
|
|
from fido2.client import Fido2Client, UserInteraction, WebAuthnClient
|
|
from fido2.hid import CtapHidDevice
|
|
from fido2.webauthn import AuthenticationResponse
|
|
from fido2.webauthn import PublicKeyCredentialRequestOptions, UserVerificationRequirement
|
|
|
|
try:
|
|
from fido2.client.windows import WindowsClient
|
|
except ImportError:
|
|
WindowsClient = None
|
|
|
|
_PLUGIN_ENDIANNESS = 'little'
|
|
_PLUGIN_HEADER_SIZE = 4
|
|
|
|
# Exit codes.
|
|
_EXIT_NO_FIDO2_DEVICES = 11
|
|
_EXIT_ALL_ASSERTIONS_FAILED = 12
|
|
_EXIT_NO_MATCHING_CRED = 13
|
|
|
|
|
|
def read_full(r: BinaryIO, size: int) -> bytes:
|
|
"""Read an exact amount of data.
|
|
|
|
Raises exception on error or EOF.
|
|
"""
|
|
b = r.read(size)
|
|
if len(b) != size:
|
|
raise EOFError(f"premature EOF when reading {size} bytes from {r}.")
|
|
return b
|
|
|
|
|
|
def write_full(w: BinaryIO, b: bytes):
|
|
"""Write all bytes.
|
|
|
|
Raises IOError if the write isn't complete.
|
|
"""
|
|
written = w.write(b)
|
|
if written != len(b):
|
|
raise IOError(
|
|
f"failed to write fully, wrote {written} bytes out of {_PLUGIN_HEADER_SIZE} bytes."
|
|
)
|
|
|
|
|
|
def plugin_read(r: BinaryIO) -> bytes:
|
|
"""Read a framed WebAuthn plugin message.
|
|
|
|
A frame consists of: 4 bytes of little endian uint32 length, plus
|
|
this amount bytes of binary data.
|
|
"""
|
|
header = read_full(r, _PLUGIN_HEADER_SIZE)
|
|
length = int.from_bytes(header, _PLUGIN_ENDIANNESS, signed=False)
|
|
return read_full(r, length)
|
|
|
|
|
|
def plugin_write(w: BinaryIO, b: bytes):
|
|
"""Write a framed Webauthn plugin message.
|
|
|
|
A frame consists of: 4 bytes of little endian uint32 length, plus
|
|
this amount bytes of binary data.
|
|
"""
|
|
length = len(b)
|
|
header = length.to_bytes(_PLUGIN_HEADER_SIZE,
|
|
_PLUGIN_ENDIANNESS,
|
|
signed=False)
|
|
write_full(w, header)
|
|
write_full(w, b)
|
|
|
|
|
|
@dataclasses.dataclass
|
|
class PluginRequest:
|
|
origin: str
|
|
public_key_credential_request: PublicKeyCredentialRequestOptions
|
|
|
|
|
|
def parse_plugin_request(b: bytes) -> PluginRequest:
|
|
"""Parse a plugin request JSON string."""
|
|
j = json.loads(b)
|
|
|
|
req = PublicKeyCredentialRequestOptions.from_dict(j["requestData"])
|
|
|
|
# Apply overrides to certain fields.
|
|
req = PublicKeyCredentialRequestOptions(
|
|
challenge=req.challenge,
|
|
rp_id=req.rp_id,
|
|
allow_credentials=req.allow_credentials,
|
|
hints=req.hints,
|
|
|
|
# Default to 30s timeout.
|
|
timeout=req.timeout or 30_000,
|
|
|
|
# Discourage UV.
|
|
#
|
|
# ReAuth flow is triggered for user who's already logged in, so
|
|
# there's no need to ask for PIN/password authentication factor.
|
|
#
|
|
# Here we only want to test for user presence and ownership of
|
|
# the private key.
|
|
user_verification=UserVerificationRequirement.DISCOURAGED,
|
|
|
|
# Don't support extensions for now.
|
|
extensions=None,
|
|
)
|
|
|
|
return PluginRequest(
|
|
origin=j["origin"],
|
|
public_key_credential_request=req,
|
|
)
|
|
|
|
|
|
def encode_plugin_response(a: AuthenticationResponse) -> bytes:
|
|
"""Encode a plugin response to JSON."""
|
|
return json.dumps({
|
|
"type": "getResponse",
|
|
"responseData": dict(a),
|
|
"error": None,
|
|
}).encode('utf-8')
|
|
|
|
|
|
class DiscardInteraction(UserInteraction):
|
|
"""Handler when user interaction is required.
|
|
|
|
This plugin's stdin/stdout talks with git-credential-luci, so we fail
|
|
actions that require user input (this plugin shouldn't set any flag
|
|
that require user interaction).
|
|
"""
|
|
|
|
def prompt_up(self):
|
|
sys.stderr.write("\nTouch your blinking security key to continue.\n\n")
|
|
|
|
def request_pin(self, permissions, rp_id):
|
|
# This plugin shouldn't set assertion flags that will require
|
|
# PIN entry.
|
|
return None
|
|
|
|
def request_uv(self, permissions, rp_id):
|
|
# Don't allow user verification (UV), because we don't allow PIN
|
|
# entry, UV will fail.
|
|
return False
|
|
|
|
|
|
def get_clients(origin: str) -> list[tuple[WebAuthnClient, str]]:
|
|
"""Return WebAuthn clients.
|
|
|
|
The return value is a list of (WebAuthnClient, client description)
|
|
where we can send assertion requests to.
|
|
|
|
On Windows, this method returns a client that talks with Win32 API
|
|
if available.
|
|
"""
|
|
client_data_collector = DefaultClientDataCollector(origin)
|
|
|
|
# Use Windows WebAuthn API if available.
|
|
if WindowsClient and WindowsClient.is_available():
|
|
logging.info("Using WindowsClient")
|
|
return [(WindowsClient(client_data_collector), "WindowsWebAuthn")]
|
|
|
|
user_interaction = DiscardInteraction()
|
|
clients = []
|
|
for dev in CtapHidDevice.list_devices():
|
|
desc = dev.descriptor
|
|
desc_str = (f'CtapHidDevice {desc.product_name}'
|
|
f' (VID 0x{desc.vid:04x},'
|
|
f' PID 0x{desc.pid:04x}) at {desc.path}')
|
|
logging.info("Found %s", desc_str)
|
|
clients.append((
|
|
Fido2Client(
|
|
dev,
|
|
client_data_collector=client_data_collector,
|
|
user_interaction=user_interaction,
|
|
),
|
|
desc_str,
|
|
))
|
|
|
|
return clients
|
|
|
|
|
|
def assert_on_client(*, client: WebAuthnClient, client_desc: str,
|
|
request: PublicKeyCredentialRequestOptions, cancel: Event):
|
|
try:
|
|
return client.get_assertion(request, cancel)
|
|
except Exception as e:
|
|
if not cancel.is_set():
|
|
logging.error("Assertion failed on %s: %s", client_desc, e)
|
|
return None
|
|
|
|
|
|
@contextmanager
|
|
def set_event_on_signal(signum: int, event: Event):
|
|
"""Return a context manager that sets `event` when `signum` is signaled."""
|
|
original_handler = signal.getsignal(signum)
|
|
|
|
def handler(signum, _):
|
|
logging.info("Signal %s received.", signal.strsignal(signum))
|
|
event.set()
|
|
|
|
signal.signal(signum, handler)
|
|
try:
|
|
yield
|
|
finally:
|
|
signal.signal(signum, original_handler)
|
|
|
|
|
|
def main():
|
|
logging.basicConfig(level=logging.INFO)
|
|
plugin_req = parse_plugin_request(plugin_read(sys.stdin.buffer))
|
|
|
|
clients = get_clients(plugin_req.origin)
|
|
if not clients:
|
|
logging.error("No available FIDO devices.")
|
|
sys.exit(_EXIT_NO_FIDO2_DEVICES)
|
|
|
|
# Race and retrieve the first successful assertion.
|
|
outcome = None
|
|
cancel = Event()
|
|
with set_event_on_signal(signal.SIGINT, cancel), set_event_on_signal(
|
|
signal.SIGTERM,
|
|
cancel), ThreadPoolExecutor(max_workers=len(clients)) as executor:
|
|
futures = [
|
|
executor.submit(assert_on_client,
|
|
client=client,
|
|
client_desc=desc,
|
|
request=plugin_req.public_key_credential_request,
|
|
cancel=cancel) for client, desc in clients
|
|
]
|
|
for future in as_completed(futures):
|
|
if result := future.result():
|
|
outcome = result
|
|
cancel.set()
|
|
break
|
|
|
|
if not outcome:
|
|
logging.error("All assertions failed or timed out.")
|
|
sys.exit(_EXIT_ALL_ASSERTIONS_FAILED)
|
|
|
|
assertions = outcome.get_assertions()
|
|
if not assertions:
|
|
logging.error("No matching credential.")
|
|
sys.exit(_EXIT_NO_MATCHING_CRED)
|
|
elif len(assertions) > 1:
|
|
logging.warning(
|
|
"Multiple assertions returned for rp_id %s, selecting the first one.",
|
|
plugin_req.public_key_credential_request.rp_id)
|
|
|
|
# Write the first completed assertion.
|
|
plugin_write(sys.stdout.buffer,
|
|
encode_plugin_response(outcome.get_response(0)))
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|