Validate check requests from Maxsight
This topic provides examples on how your integration can validate check requests coming from Maxsight.
For information on how Maxsight signs requests, see How Maxsight signs check requests.
To implement correct request validation for check requests coming from Maxsight, run one of the following code examples:
Python code example:
This code example builds on the Python library
flask_httpauth
inheriting from theHTTPAuth
class.import base64 import calendar import hashlib import hmac import logging import time from email.utils import parsedate from flask import request from flask_httpauth import HTTPAuth from werkzeug.datastructures import Authorization from werkzeug.exceptions import Unauthorized class HTTPSignatureInvalid(Unauthorized): description = "Invalid signature in request." def __init__(self, description=None): super().__init__(description) self.description = description or self.description class HTTPSignatureAuth(HTTPAuth): def __init__( self, scheme="Signature", realm=None, required_headers=None, require_digest:bool=True, max_delay_time:int=30 ): super().__init__(scheme, realm) if required_headers is None: required_headers = ["(request-target)", "date"] self.required_headers = required_headers self.require_digest = require_digest self.key_resolver = None self.max_delay_time = max_delay_time def resolve_key(self, func): self.key_resolver = func return func def signed_headers_required(self, f): return self.login_required(f) @staticmethod def _get_bytes_to_sign(headers): result = [] for header in headers: if header == "(request-target)": path_url = request.full_path if request.query_string else request.path result.append(f"(request-target): {request.method.lower()} {path_url}") else: if header == "host": value = request.headers.get("host", request.host) else: value = request.headers[header] result.append(f"{header}: {value}") return "\n".join(result).encode() def authenticate(self, auth: Authorization, _pw): # Get the current time as early as possible authentication_time = time.time() if auth is None: logging.warning("No authentication provided.") raise HTTPSignatureInvalid() assert ( self.key_resolver is not None ), "Key resolver should be set before authenticating request." sig_dict = auth.parameters for field in "keyId", "algorithm", "signature": if field not in sig_dict: logging.warning("Malformed authorisation header.") raise HTTPSignatureInvalid() if sig_dict["algorithm"] not in {"hmac-sha256", "hs2019"}: logging.warning(f'Unsupported signature algorithm: {sig_dict["algorithm"]}') raise HTTPSignatureInvalid() # We deviate from the spec here, which says the default should be '(created)'. However, this is only valid # for asymmetric signatures, which we don't support. headers = [ header.lower() for header in sig_dict.get("headers", "date").split(" ") ] for header in self.required_headers: if header not in headers: logging.warning(f"Missing required header `{header}` in signature.") raise HTTPSignatureInvalid() if self.require_digest and request.data: if "digest" not in headers: logging.warning("Missing required header `digest` in signature.") raise HTTPSignatureInvalid() encoded_digest = base64.b64encode( hashlib.sha256(request.data).digest() ).decode() expected_digest = request.headers["digest"] computed_digest = f"SHA-256={encoded_digest}" if expected_digest != computed_digest: logging.warning( f"Digest header does not match request body.\n" f" Expected: {expected_digest}\n" f" Computed: {computed_digest}" ) raise HTTPSignatureInvalid() if "date" in headers: # The struct_time returned by parsedate will be converted to epoch # time using the system TZ, so we use calendar.timegm() to ensure # it's consistently UTC supplied_date = calendar.timegm(parsedate(request.headers["date"])) # Require supplied date to be close to the current time if abs(authentication_time - supplied_date) > self.max_delay_time: logging.warning("Date on request too far away from current time.") raise HTTPSignatureInvalid() expected_signature = base64.b64decode(sig_dict["signature"].encode()) bytes_to_sign = self._get_bytes_to_sign(headers) key = base64.b64decode(self.key_resolver(key_id=sig_dict["keyId"])) if key is None: logging.warning(f'Unknown key ID `{sig_dict["keyId"]}` when verifying signature.') raise HTTPSignatureInvalid() computed_signature = hmac.new( key, bytes_to_sign, digestmod=hashlib.sha256 ).digest() signature_valid = hmac.compare_digest(expected_signature, computed_signature) if not signature_valid: logging.warning("Signature on request does not match expected signature.") raise HTTPSignatureInvalid() return True
TypeScript code example:
import * as crypto from 'crypto'; import * as express from 'express'; import * as base64 from 'base-64'; import * as utf8 from '@stablelib/utf8'; class HTTPSignatureInvalid extends Error { constructor(message?: string) { super(message); this.name = "HTTPSignatureInvalid"; } } class HTTPSignatureAuth { scheme: string; realm: string | null; requiredHeaders: string[]; requireDigest: boolean; keyResolver: any; maxDelayTime: number; constructor(scheme = "Signature", realm = null, requiredHeaders = ["(request-target)", "date"], requireDigest = true, maxDelayTime = 30) { this.scheme = scheme; this.realm = realm; this.requiredHeaders = requiredHeaders; this.requireDigest = requireDigest; this.keyResolver = null; this.maxDelayTime = maxDelayTime; } resolveKey(func: any) { this.keyResolver = func; return func; } static _getBytesToSign(headers: string[]) { let result: string[] = []; for (let header of headers) { if (header === "(request-target)") { let pathUrl = express.request.path; result.push(`(request-target): ${express.request.method.toLowerCase()} ${pathUrl}`); } else { let value = express.request.headers[header]; result.push(`${header}: ${value}`); } } return utf8.encode("\n".join(result)); } authenticate(auth: any, _pw: any) { // Get the current time as early as possible let authenticationTime = Date.now() / 1000; if (auth === null) { console.warn("No authentication provided."); throw new HTTPSignatureInvalid(); } if (this.keyResolver === null) { throw new Error("Key resolver should be set before authenticating request."); } let sigDict = auth.parameters; for (let field of ["keyId", "algorithm", "signature"]) { if (!(field in sigDict)) { console.warn("Malformed authorisation header."); throw new HTTPSignatureInvalid(); } } if (!["hmac-sha256", "hs2019"].includes(sigDict["algorithm"])) { console.warn(`Unsupported signature algorithm: ${sigDict["algorithm"]}`); throw new HTTPSignatureInvalid(); } let headers = sigDict.get("headers", "date").split(" ").map(header => header.toLowerCase()); for (let header of this.requiredHeaders) { if (!headers.includes(header)) { console.warn(`Missing required header ${header} in signature.`); throw new HTTPSignatureInvalid(); } } if (this.requireDigest && express.request.body) { if (!headers.includes("digest")) { console.warn("Missing required header `digest` in signature."); throw new HTTPSignatureInvalid(); } let encodedDigest = base64.encode(crypto.createHash('sha256').update(express.request.body).digest()); let expectedDigest = express.request.headers["digest"]; let computedDigest = `SHA-256=${encodedDigest}`; if (expectedDigest !== computedDigest) { console.warn(`Digest header does not match request body.\nExpected: ${expectedDigest}\nComputed: ${computedDigest}`); throw new HTTPSignatureInvalid(); } } if (headers.includes("date")) { let suppliedDate = Date.parse(express.request.headers["date"]) / 1000; if (Math.abs(authenticationTime - suppliedDate) > this.maxDelayTime) { console.warn("Date on request too far away from current time."); throw new HTTPSignatureInvalid(); } } let expectedSignature = base64.decode(sigDict["signature"]); let bytesToSign = HTTPSignatureAuth._getBytesToSign(headers); let key = base64.decode(this.keyResolver(key_id=sigDict["keyId"])); if (key === null) { console.warn(`Unknown key ID ${sigDict["keyId"]} when verifying signature.`); throw new HTTPSignatureInvalid(); } let computedSignature = crypto.createHmac('sha256', key).update(bytesToSign).digest(); let signatureValid = crypto.timingSafeEqual(Buffer.from(expectedSignature, 'utf8'), Buffer.from(computedSignature, 'utf8')); if (!signatureValid) { console.warn("Signature on request does not match expected signature."); throw new HTTPSignatureInvalid(); } return true; } }