Renouvellement du jeton d'accès
Lorsque vous développez une application qui fonctionne plus longtemps que la durée de vie du jeton d'accès, vous devez gérer le renouvellement du jeton. Pour savoir quand le jeton doit être renouvelé, la propriété expires_in dans la réponse du point de terminaison du jeton doit être utilisée.
Une bonne pratique consiste à utiliser le même jeton pour plusieurs requêtes jusqu'à ce qu'il soit proche de l'expiration.
L'exemple de code suivant implémente le renouvellement du jeton d'accès 10 secondes avant son expiration.
- TypeScript
- JavaScript
- Bash
- Python
import axios, { InternalAxiosRequestConfig } from 'axios';
import { Issuer, TokenSet } from 'openid-client'
const authenticator = (client_id: string, client_secret: string, token_endpoint: string) => {
let tokenSet: null | TokenSet = null;
const issuer = new Issuer({
issuer: "",
token_endpoint,
});
const client = new issuer.Client({
client_id,
client_secret,
});
return async (config: InternalAxiosRequestConfig) => {
if (tokenSet == null || (tokenSet.expires_in ?? 0) < 10) {
tokenSet = await client.grant({
grant_type: 'client_credentials'
});
}
config.headers.Authorization = `Bearer ${tokenSet.access_token}`;
return config;
}
}
const instance = axios.create({
baseURL: 'https://<platform-hostname>',
});
instance.interceptors.request.use(authenticator(
'<client-id>',
'<client-secret>',
"https://<platform-hostname>/users-auth/protocol/openid-connect/token",
));
const devices = await instance.get('/thingpark/wireless/rest/subscriptions/mine/devices');
import axios from 'axios';
import { Issuer } from 'openid-client';
const authenticator = (client_id, client_secret, token_endpoint) => {
let tokenSet = null;
const issuer = new Issuer({
issuer: "",
token_endpoint,
});
const client = new issuer.Client({
client_id,
client_secret,
});
return async (config) => {
if (tokenSet == null || (tokenSet.expires_in ?? 0) < 10) {
tokenSet = await client.grant({
grant_type: 'client_credentials'
});
}
config.headers.Authorization = `Bearer ${tokenSet.access_token}`;
return config;
};
};
const instance = axios.create({
baseURL: 'https://<platform-hostname>',
});
instance.interceptors.request.use(authenticator(
'<client-id>',
'<client-secret>',
"https://<platform-hostname>/users-auth/protocol/openid-connect/token",
));
const devices = await instance.get('/thingpark/wireless/rest/subscriptions/mine/devices');
set -eux -o pipefail
# GLOBALS: client_id, client_secret, token_endpoint, _access_token, _expires_at
client_id='<client-id>'
client_secret='<client-secret>'
token_endpoint="https://<platform-hostname>/users-auth/protocol/openid-connect/token"
token() {
: "${_expires_at:="0"}"
if [[ $(($_expires_at - $(date '+%s'))) < 30 ]]; then
local json_response
json_response=$(curl \
-s --show-error --fail \
-d "client_id=$client_id" \
-d "client_secret=$client_secret" \
-d "grant_type=client_credentials" \
$token_endpoint)
local expires_in
expires_in=$(echo "$json_response" | jq -r '.expires_in')
_expires_at=$(($(date '+%s') + $expires_in))
_access_token=$(echo "$json_response" | jq -r '.access_token')
fi
echo $_access_token
}
curl \
-H "Authorization: Bearer $(token)" \
-H "Accept: application/json" \
-s --show-error --fail \
https://<platform-hostname>/thingpark/wireless/rest/subscriptions/mine/devices | jq
import requests
import time
class OauthSession(requests.Session):
def __init__(self, client_id, client_secret, token_url, **kwargs):
super(OauthSession, self).__init__(**kwargs)
self.client_id = client_id
self.client_secret = client_secret
self.token_url = token_url
self._access_token: None | str = None
self._expires_at = None
def issue_token(self):
token = (
super(OauthSession, self)
.request(
"POST",
self.token_url,
data={
"client_id": self.client_id,
"client_secret": self.client_secret,
"grant_type": "client_credentials",
},
headers={"content-type": "application/x-www-form-urlencoded"},
)
.json()
)
self._access_token = token["access_token"]
self._expires_at = round(time.time()) + token["expires_in"]
def expires_in(self):
if self._expires_at is None:
return 0
return self._expires_at - time.time()
def request(self, method, url, headers=None, **kwargs):
if self.expires_in() < 10:
self.issue_token()
headers = headers or {}
headers["Authorization"] = "Bearer %s" % self._access_token
return super(OauthSession, self).request(method, url, headers=headers, **kwargs)
hostname = "<platform-hostname>"
session = OauthSession(
client_id="<client-id>",
client_secret="<client-secret>",
token_url=f"https://{hostname}/users-auth/protocol/openid-connect/token",
)
devices = session.get(
f"https://{hostname}/thingpark/wireless/rest/subscriptions/mine/devices",
headers={"Accept": "application/json"},
).json()