Skip to main content

Authenticating requests

In order to protect the integrity and confidentiality of personal information and to ensure your integration is only used by Maxsight, requests are authenticated using the HTTP signatures draft standard, version 12.

When implementing an integration, you must generate a 32-byte secret key to sign and verify requests and responses that your integration handles, unless specifically noted in this topic. The key ID to use should be the first eight characters of the Base64 encoding of this key. The only supported algorithm is HMAC-SHA256, and the only supported Digest is SHA-256.

You must sign and verify at least the request target, which is the verb and full path of the initial request, and date headers. Requests with a body, for example POST requests, must also include and sign a digest header. The date header must be set to the current date.

Requests with any of the following should be rejected:

  • A missing or incorrect request signature.

  • digest header that's missing or doesn't match the request body.

  • date header that's missing or more than 30 seconds from the current time.

Note

You must verify signatures in incoming requests. Signing your outgoing requests alone is not adequate. The validation suite fails integrations that don't correctly verify the signature of incoming requests.

Implementing authentication code examples

The following code snippets provide a method for implementing request authentication for requests coming from Maxsight.

Python code example

This code example builds on the Python library flask_httpauth inheriting from the HTTPAuth class.

import base64
import calendar
import hashlib
import hmac
import logging
import time
from email.utils import parsedate

from flask import request
from flask_httpauth import HTTPAuth
from werkzeug.datastructures import Authorization
from werkzeug.exceptions import Unauthorized


class HTTPSignatureInvalid(Unauthorized):
    description = "Invalid signature in request."

    def __init__(self, description=None):
        super().__init__(description)
        self.description = description or self.description


class HTTPSignatureAuth(HTTPAuth):
    def __init__(
        self, scheme="Signature", realm=None, required_headers=None, require_digest:bool=True, max_delay_time:int=30
    ):
        super().__init__(scheme, realm)

        if required_headers is None:
            required_headers = ["(request-target)", "date"]

        self.required_headers = required_headers
        self.require_digest = require_digest
        self.key_resolver = None
        self.max_delay_time = max_delay_time

    def resolve_key(self, func):
        self.key_resolver = func
        return func

    def signed_headers_required(self, f):
        return self.login_required(f)

    @staticmethod
    def _get_bytes_to_sign(headers):
        result = []
        for header in headers:
            if header == "(request-target)":
                path_url = request.full_path if request.query_string else request.path
                result.append(f"(request-target): {request.method.lower()} {path_url}")
            else:
                if header == "host":
                    value = request.headers.get("host", request.host)
                else:
                    value = request.headers[header]
                result.append(f"{header}: {value}")

        return "\n".join(result).encode()

    def authenticate(self, auth: Authorization, _pw):
        # Get the current time as early as possible
        authentication_time = time.time()
        if auth is None:
            logging.warning("No authentication provided.")
            raise HTTPSignatureInvalid()

        assert (
            self.key_resolver is not None
        ), "Key resolver should be set before authenticating request."

        sig_dict = auth.parameters
        for field in "keyId", "algorithm", "signature":
            if field not in sig_dict:
                logging.warning("Malformed authorisation header.")
                raise HTTPSignatureInvalid()

        if sig_dict["algorithm"] not in {"hmac-sha256", "hs2019"}:
            logging.warning(f'Unsupported signature algorithm: {sig_dict["algorithm"]}')
            raise HTTPSignatureInvalid()

        # We deviate from the spec here, which says the default should be '(created)'. However, this is only valid
        # for asymmetric signatures, which we don't support.
        headers = [
            header.lower() for header in sig_dict.get("headers", "date").split(" ")
        ]

        for header in self.required_headers:
            if header not in headers:
                logging.warning(f"Missing required header `{header}` in signature.")
                raise HTTPSignatureInvalid()

        if self.require_digest and request.data:
            if "digest" not in headers:
                logging.warning("Missing required header `digest` in signature.")
                raise HTTPSignatureInvalid()

            encoded_digest = base64.b64encode(
                hashlib.sha256(request.data).digest()
            ).decode()

            expected_digest = request.headers["digest"]
            computed_digest = f"SHA-256={encoded_digest}"
            if expected_digest != computed_digest:
                logging.warning(
                    f"Digest header does not match request body.\n"
                    f"    Expected: {expected_digest}\n"
                    f"    Computed: {computed_digest}"
                )
                raise HTTPSignatureInvalid()

        if "date" in headers:
            # The struct_time returned by parsedate will be converted to epoch
            # time using the system TZ, so we use calendar.timegm() to ensure
            # it's consistently UTC
            supplied_date = calendar.timegm(parsedate(request.headers["date"]))

            # Require supplied date to be close to the current time
            if abs(authentication_time - supplied_date) > self.max_delay_time:
                logging.warning("Date on request too far away from current time.")
                raise HTTPSignatureInvalid()

        expected_signature = base64.b64decode(sig_dict["signature"].encode())

        bytes_to_sign = self._get_bytes_to_sign(headers)
        key = base64.b64decode(self.key_resolver(key_id=sig_dict["keyId"]))

        if key is None:
            logging.warning(f'Unknown key ID `{sig_dict["keyId"]}` when verifying signature.')
            raise HTTPSignatureInvalid()

        computed_signature = hmac.new(
            key, bytes_to_sign, digestmod=hashlib.sha256
        ).digest()

        signature_valid = hmac.compare_digest(expected_signature, computed_signature)
        if not signature_valid:
            logging.warning("Signature on request does not match expected signature.")
            raise HTTPSignatureInvalid()

        return True

TypeScript code example 

import * as crypto from 'crypto';
import * as express from 'express';
import * as base64 from 'base-64';
import * as utf8 from '@stablelib/utf8';

class HTTPSignatureInvalid extends Error {
    constructor(message?: string) {
        super(message);
        this.name = "HTTPSignatureInvalid";
    }
}

class HTTPSignatureAuth {
    scheme: string;
    realm: string | null;
    requiredHeaders: string[];
    requireDigest: boolean;
    keyResolver: any;
    maxDelayTime: number;

    constructor(scheme = "Signature", realm = null, requiredHeaders = ["(request-target)", "date"], requireDigest = true, maxDelayTime = 30) {
        this.scheme = scheme;
        this.realm = realm;
        this.requiredHeaders = requiredHeaders;
        this.requireDigest = requireDigest;
        this.keyResolver = null;
        this.maxDelayTime = maxDelayTime;
    }

    resolveKey(func: any) {
        this.keyResolver = func;
        return func;
    }

    static _getBytesToSign(headers: string[]) {
        let result: string[] = [];
        for (let header of headers) {
            if (header === "(request-target)") {
                let pathUrl = express.request.path;
                result.push(`(request-target): ${express.request.method.toLowerCase()} ${pathUrl}`);
            } else {
                let value = express.request.headers[header];
                result.push(`${header}: ${value}`);
            }
        }
        return utf8.encode("\n".join(result));
    }

    authenticate(auth: any, _pw: any) {
        // Get the current time as early as possible
        let authenticationTime = Date.now() / 1000;
        if (auth === null) {
            console.warn("No authentication provided.");
            throw new HTTPSignatureInvalid();
        }
        if (this.keyResolver === null) {
            throw new Error("Key resolver should be set before authenticating request.");
        }
        let sigDict = auth.parameters;
        for (let field of ["keyId", "algorithm", "signature"]) {
            if (!(field in sigDict)) {
                console.warn("Malformed authorisation header.");
                throw new HTTPSignatureInvalid();
            }
        }
        if (!["hmac-sha256", "hs2019"].includes(sigDict["algorithm"])) {
            console.warn(`Unsupported signature algorithm: ${sigDict["algorithm"]}`);
            throw new HTTPSignatureInvalid();
        }
        let headers = sigDict.get("headers", "date").split(" ").map(header => header.toLowerCase());
        for (let header of this.requiredHeaders) {
            if (!headers.includes(header)) {
                console.warn(`Missing required header ${header} in signature.`);
                throw new HTTPSignatureInvalid();
            }
        }
        if (this.requireDigest && express.request.body) {
            if (!headers.includes("digest")) {
                console.warn("Missing required header `digest` in signature.");
                throw new HTTPSignatureInvalid();
            }
            let encodedDigest = base64.encode(crypto.createHash('sha256').update(express.request.body).digest());
            let expectedDigest = express.request.headers["digest"];
            let computedDigest = `SHA-256=${encodedDigest}`;
            if (expectedDigest !== computedDigest) {
                console.warn(`Digest header does not match request body.\nExpected: ${expectedDigest}\nComputed: ${computedDigest}`);
                throw new HTTPSignatureInvalid();
            }
        }
        if (headers.includes("date")) {
            let suppliedDate = Date.parse(express.request.headers["date"]) / 1000;
            if (Math.abs(authenticationTime - suppliedDate) > this.maxDelayTime) {
                console.warn("Date on request too far away from current time.");
                throw new HTTPSignatureInvalid();
            }
        }
        let expectedSignature = base64.decode(sigDict["signature"]);
        let bytesToSign = HTTPSignatureAuth._getBytesToSign(headers);
        let key = base64.decode(this.keyResolver(key_id=sigDict["keyId"]));
        if (key === null) {
            console.warn(`Unknown key ID ${sigDict["keyId"]} when verifying signature.`);
            throw new HTTPSignatureInvalid();
        }
        let computedSignature = crypto.createHmac('sha256', key).update(bytesToSign).digest();
        let signatureValid = crypto.timingSafeEqual(Buffer.from(expectedSignature, 'utf8'), Buffer.from(computedSignature, 'utf8'));
        if (!signatureValid) {
            console.warn("Signature on request does not match expected signature.");
            throw new HTTPSignatureInvalid();
        }
        return true;
    }
}

Additional information