Skip to main content

Validate check requests from Maxsight

This topic provides examples on how your integration can validate check requests coming from Maxsight.

For information on how Maxsight signs requests, see How Maxsight signs check requests.

To implement correct request validation for check requests coming from Maxsight, run one of the following code examples:

  • Python code example:

    This code example builds on the Python library flask_httpauth inheriting from the HTTPAuth class.

    import base64
    import calendar
    import hashlib
    import hmac
    import logging
    import time
    from email.utils import parsedate
    
    from flask import request
    from flask_httpauth import HTTPAuth
    from werkzeug.datastructures import Authorization
    from werkzeug.exceptions import Unauthorized
    
    
    class HTTPSignatureInvalid(Unauthorized):
        description = "Invalid signature in request."
    
        def __init__(self, description=None):
            super().__init__(description)
            self.description = description or self.description
    
    
    class HTTPSignatureAuth(HTTPAuth):
        def __init__(
            self, scheme="Signature", realm=None, required_headers=None, require_digest:bool=True, max_delay_time:int=30
        ):
            super().__init__(scheme, realm)
    
            if required_headers is None:
                required_headers = ["(request-target)", "date"]
    
            self.required_headers = required_headers
            self.require_digest = require_digest
            self.key_resolver = None
            self.max_delay_time = max_delay_time
    
        def resolve_key(self, func):
            self.key_resolver = func
            return func
    
        def signed_headers_required(self, f):
            return self.login_required(f)
    
        @staticmethod
        def _get_bytes_to_sign(headers):
            result = []
            for header in headers:
                if header == "(request-target)":
                    path_url = request.full_path if request.query_string else request.path
                    result.append(f"(request-target): {request.method.lower()} {path_url}")
                else:
                    if header == "host":
                        value = request.headers.get("host", request.host)
                    else:
                        value = request.headers[header]
                    result.append(f"{header}: {value}")
    
            return "\n".join(result).encode()
    
        def authenticate(self, auth: Authorization, _pw):
            # Get the current time as early as possible
            authentication_time = time.time()
            if auth is None:
                logging.warning("No authentication provided.")
                raise HTTPSignatureInvalid()
    
            assert (
                self.key_resolver is not None
            ), "Key resolver should be set before authenticating request."
    
            sig_dict = auth.parameters
            for field in "keyId", "algorithm", "signature":
                if field not in sig_dict:
                    logging.warning("Malformed authorisation header.")
                    raise HTTPSignatureInvalid()
    
            if sig_dict["algorithm"] not in {"hmac-sha256", "hs2019"}:
                logging.warning(f'Unsupported signature algorithm: {sig_dict["algorithm"]}')
                raise HTTPSignatureInvalid()
    
            # We deviate from the spec here, which says the default should be '(created)'. However, this is only valid
            # for asymmetric signatures, which we don't support.
            headers = [
                header.lower() for header in sig_dict.get("headers", "date").split(" ")
            ]
    
            for header in self.required_headers:
                if header not in headers:
                    logging.warning(f"Missing required header `{header}` in signature.")
                    raise HTTPSignatureInvalid()
    
            if self.require_digest and request.data:
                if "digest" not in headers:
                    logging.warning("Missing required header `digest` in signature.")
                    raise HTTPSignatureInvalid()
    
                encoded_digest = base64.b64encode(
                    hashlib.sha256(request.data).digest()
                ).decode()
    
                expected_digest = request.headers["digest"]
                computed_digest = f"SHA-256={encoded_digest}"
                if expected_digest != computed_digest:
                    logging.warning(
                        f"Digest header does not match request body.\n"
                        f"    Expected: {expected_digest}\n"
                        f"    Computed: {computed_digest}"
                    )
                    raise HTTPSignatureInvalid()
    
            if "date" in headers:
                # The struct_time returned by parsedate will be converted to epoch
                # time using the system TZ, so we use calendar.timegm() to ensure
                # it's consistently UTC
                supplied_date = calendar.timegm(parsedate(request.headers["date"]))
    
                # Require supplied date to be close to the current time
                if abs(authentication_time - supplied_date) > self.max_delay_time:
                    logging.warning("Date on request too far away from current time.")
                    raise HTTPSignatureInvalid()
    
            expected_signature = base64.b64decode(sig_dict["signature"].encode())
    
            bytes_to_sign = self._get_bytes_to_sign(headers)
            key = base64.b64decode(self.key_resolver(key_id=sig_dict["keyId"]))
    
            if key is None:
                logging.warning(f'Unknown key ID `{sig_dict["keyId"]}` when verifying signature.')
                raise HTTPSignatureInvalid()
    
            computed_signature = hmac.new(
                key, bytes_to_sign, digestmod=hashlib.sha256
            ).digest()
    
            signature_valid = hmac.compare_digest(expected_signature, computed_signature)
            if not signature_valid:
                logging.warning("Signature on request does not match expected signature.")
                raise HTTPSignatureInvalid()
    
            return True
  • TypeScript code example:

    import * as crypto from 'crypto';
    import * as express from 'express';
    import * as base64 from 'base-64';
    import * as utf8 from '@stablelib/utf8';
    
    class HTTPSignatureInvalid extends Error {
        constructor(message?: string) {
            super(message);
            this.name = "HTTPSignatureInvalid";
        }
    }
    
    class HTTPSignatureAuth {
        scheme: string;
        realm: string | null;
        requiredHeaders: string[];
        requireDigest: boolean;
        keyResolver: any;
        maxDelayTime: number;
    
        constructor(scheme = "Signature", realm = null, requiredHeaders = ["(request-target)", "date"], requireDigest = true, maxDelayTime = 30) {
            this.scheme = scheme;
            this.realm = realm;
            this.requiredHeaders = requiredHeaders;
            this.requireDigest = requireDigest;
            this.keyResolver = null;
            this.maxDelayTime = maxDelayTime;
        }
    
        resolveKey(func: any) {
            this.keyResolver = func;
            return func;
        }
    
        static _getBytesToSign(headers: string[]) {
            let result: string[] = [];
            for (let header of headers) {
                if (header === "(request-target)") {
                    let pathUrl = express.request.path;
                    result.push(`(request-target): ${express.request.method.toLowerCase()} ${pathUrl}`);
                } else {
                    let value = express.request.headers[header];
                    result.push(`${header}: ${value}`);
                }
            }
            return utf8.encode("\n".join(result));
        }
    
        authenticate(auth: any, _pw: any) {
            // Get the current time as early as possible
            let authenticationTime = Date.now() / 1000;
            if (auth === null) {
                console.warn("No authentication provided.");
                throw new HTTPSignatureInvalid();
            }
            if (this.keyResolver === null) {
                throw new Error("Key resolver should be set before authenticating request.");
            }
            let sigDict = auth.parameters;
            for (let field of ["keyId", "algorithm", "signature"]) {
                if (!(field in sigDict)) {
                    console.warn("Malformed authorisation header.");
                    throw new HTTPSignatureInvalid();
                }
            }
            if (!["hmac-sha256", "hs2019"].includes(sigDict["algorithm"])) {
                console.warn(`Unsupported signature algorithm: ${sigDict["algorithm"]}`);
                throw new HTTPSignatureInvalid();
            }
            let headers = sigDict.get("headers", "date").split(" ").map(header => header.toLowerCase());
            for (let header of this.requiredHeaders) {
                if (!headers.includes(header)) {
                    console.warn(`Missing required header ${header} in signature.`);
                    throw new HTTPSignatureInvalid();
                }
            }
            if (this.requireDigest && express.request.body) {
                if (!headers.includes("digest")) {
                    console.warn("Missing required header `digest` in signature.");
                    throw new HTTPSignatureInvalid();
                }
                let encodedDigest = base64.encode(crypto.createHash('sha256').update(express.request.body).digest());
                let expectedDigest = express.request.headers["digest"];
                let computedDigest = `SHA-256=${encodedDigest}`;
                if (expectedDigest !== computedDigest) {
                    console.warn(`Digest header does not match request body.\nExpected: ${expectedDigest}\nComputed: ${computedDigest}`);
                    throw new HTTPSignatureInvalid();
                }
            }
            if (headers.includes("date")) {
                let suppliedDate = Date.parse(express.request.headers["date"]) / 1000;
                if (Math.abs(authenticationTime - suppliedDate) > this.maxDelayTime) {
                    console.warn("Date on request too far away from current time.");
                    throw new HTTPSignatureInvalid();
                }
            }
            let expectedSignature = base64.decode(sigDict["signature"]);
            let bytesToSign = HTTPSignatureAuth._getBytesToSign(headers);
            let key = base64.decode(this.keyResolver(key_id=sigDict["keyId"]));
            if (key === null) {
                console.warn(`Unknown key ID ${sigDict["keyId"]} when verifying signature.`);
                throw new HTTPSignatureInvalid();
            }
            let computedSignature = crypto.createHmac('sha256', key).update(bytesToSign).digest();
            let signatureValid = crypto.timingSafeEqual(Buffer.from(expectedSignature, 'utf8'), Buffer.from(computedSignature, 'utf8'));
            if (!signatureValid) {
                console.warn("Signature on request does not match expected signature.");
                throw new HTTPSignatureInvalid();
            }
            return true;
        }
    }

Additional information