Access token renewal
When you are developing an application that run for longer than the access
token lifetime, you should handle the token renewal. To know when the token
should be renewed, the expires_in property in the response of the token
endpoint should be used.
A good practice is to use the same token for multiple requests until it is close to expire.
The following code sample implements the access token renewal 10 seconds before its expiration.
- TypeScript
- JavaScript
- Bash
- Python
import axios, { InternalAxiosRequestConfig } from 'axios';
import { Issuer, TokenSet } from 'openid-client'
const authenticator = (client_id: string, client_secret: string, token_endpoint: string) => {
    let tokenSet: null | TokenSet = null;
    const issuer = new Issuer({
        issuer: "",
        token_endpoint,
    });
    const client = new issuer.Client({
        client_id,
        client_secret,
    });
    return async (config: InternalAxiosRequestConfig) => {
        if (tokenSet == null || (tokenSet.expires_in ?? 0) < 10) {
            tokenSet = await client.grant({
                grant_type: 'client_credentials'
            });
        }
        config.headers.Authorization = `Bearer ${tokenSet.access_token}`;
        return config;
    }
}
const instance = axios.create({
    baseURL: 'https://<platform-hostname>',
});
instance.interceptors.request.use(authenticator(
    '<client-id>',
    '<client-secret>',
    "https://<platform-hostname>/users-auth/protocol/openid-connect/token",
));
const devices = await instance.get('/thingpark/wireless/rest/subscriptions/mine/devices');
import axios from 'axios';
import { Issuer } from 'openid-client';
const authenticator = (client_id, client_secret, token_endpoint) => {
    let tokenSet = null;
    const issuer = new Issuer({
        issuer: "",
        token_endpoint,
    });
    const client = new issuer.Client({
        client_id,
        client_secret,
    });
    return async (config) => {
        if (tokenSet == null || (tokenSet.expires_in ?? 0) < 10) {
            tokenSet = await client.grant({
                grant_type: 'client_credentials'
            });
        }
        config.headers.Authorization = `Bearer ${tokenSet.access_token}`;
        return config;
    };
};
const instance = axios.create({
    baseURL: 'https://<platform-hostname>',
});
instance.interceptors.request.use(authenticator(
    '<client-id>',
    '<client-secret>',
    "https://<platform-hostname>/users-auth/protocol/openid-connect/token",
));
const devices = await instance.get('/thingpark/wireless/rest/subscriptions/mine/devices');
set -eux -o pipefail
# GLOBALS: client_id, client_secret, token_endpoint, _access_token, _expires_at
client_id='<client-id>'
client_secret='<client-secret>'
token_endpoint="https://<platform-hostname>/users-auth/protocol/openid-connect/token"
token() {
  : "${_expires_at:="0"}"
  if [[ $(($_expires_at - $(date '+%s'))) < 30 ]]; then
    local json_response
    json_response=$(curl \
      -s --show-error --fail \
      -d "client_id=$client_id" \
      -d "client_secret=$client_secret" \
      -d "grant_type=client_credentials" \
      $token_endpoint)
    local expires_in
    expires_in=$(echo "$json_response" | jq -r '.expires_in')
    _expires_at=$(($(date '+%s') + $expires_in))
    _access_token=$(echo "$json_response" | jq -r '.access_token')
  fi
  echo $_access_token
}
curl \
   -H "Authorization: Bearer $(token)" \
   -H "Accept: application/json" \
   -s --show-error --fail \
   https://<platform-hostname>/thingpark/wireless/rest/subscriptions/mine/devices | jq
import requests
import time
class OauthSession(requests.Session):
    def __init__(self, client_id, client_secret, token_url, **kwargs):
        super(OauthSession, self).__init__(**kwargs)
        self.client_id = client_id
        self.client_secret = client_secret
        self.token_url = token_url
        self._access_token: None | str = None
        self._expires_at = None
    def issue_token(self):
        token = (
            super(OauthSession, self)
            .request(
                "POST",
                self.token_url,
                data={
                    "client_id": self.client_id,
                    "client_secret": self.client_secret,
                    "grant_type": "client_credentials",
                },
                headers={"content-type": "application/x-www-form-urlencoded"},
            )
            .json()
        )
        self._access_token = token["access_token"]
        self._expires_at = round(time.time()) + token["expires_in"]
    def expires_in(self):
        if self._expires_at is None:
            return 0
        return self._expires_at - time.time()
    def request(self, method, url, headers=None, **kwargs):
        if self.expires_in() < 10:
            self.issue_token()
        headers = headers or {}
        headers["Authorization"] = "Bearer %s" % self._access_token
        return super(OauthSession, self).request(method, url, headers=headers, **kwargs)
hostname = "<platform-hostname>"
session = OauthSession(
    client_id="<client-id>",
    client_secret="<client-secret>",
    token_url=f"https://{hostname}/users-auth/protocol/openid-connect/token",
)
devices = session.get(
    f"https://{hostname}/thingpark/wireless/rest/subscriptions/mine/devices",
    headers={"Accept": "application/json"},
).json()