Define basic Client for Vue Apps API

This commit is contained in:
havlong
2022-12-20 17:47:51 +03:00
parent 30b137de3a
commit 6697a7b80e

View File

@@ -1,10 +1,81 @@
from time import time
from typing import Optional, Any, Dict
from oauthlib.oauth2 import BackendApplicationClient
from requests_oauthlib import OAuth2Session
vue_apps_url: str = 'https://apps-vue.ru'
default_endpoints: Dict[str, str] = {'access': '/auth', 'refresh': '/refresh',
'ping': '/ping', 'endpoints': '/endpoints'}
class ClientAPI:
client_id: str
client_secret: str
token: Optional[Any]
session: OAuth2Session
endpoints: Dict[str, str]
def build_url(self, endpoint: str) -> str:
return f'{self.vue_apps_url}{self.endpoints[endpoint]}'
def __init__(self, client_id: str, client_secret: str, vue_url: str = vue_apps_url):
"""
Constructor of Vue Apps Client API
Use this object to make requests to our API with ease
:param client_id: provided to you in the process of App Creation at Vue Developer Portal
:param client_secret: non-shareable secret from Developer Portal that provides you with access to our API
:param vue_url: custom URL of Vue Apps API to use
"""
self.vue_apps_url = vue_url
self.endpoints = default_endpoints
def __init__(self, client_id: str, client_secret: str):
self.client_id = client_id
self.client_secret = client_secret
def get_token(self) -> str:
return self.client_secret
oauth = OAuth2Session(client=BackendApplicationClient(client_id=self.client_id))
client_token = oauth.fetch_token(token_url=self.build_url('access'), client_id=self.client_id,
client_secret=self.client_secret)
self.token = client_token
self.setup_flow()
self.update_endpoints()
def setup_flow(self):
"""
Method sets up the client OAuth2 Session, which will be refreshing tokens for you automatically on requests
"""
def save_token(token):
if token:
self.token = token
auto_refresh_info = {'client_id': self.client_id, 'client_secret': self.client_secret}
self.session = OAuth2Session(token=self.token, client_id=self.client_id,
auto_refresh_url=self.build_url('refresh'),
auto_refresh_kwargs=auto_refresh_info, token_updater=save_token)
def acquire_token(self) -> Optional[Any]:
"""
Get available Access Token to send requests with OAuth2
:return: Access Token object
"""
return self.session.access_token
def refresh_token(self) -> bool:
"""
Forces token to be refreshed and sends requests to /ping
:return: True when /ping sent HTTP 200 OK else False
"""
self.token['expires_at'] = time() - 10
self.session.token = self.token
response = self.session.get(self.build_url('ping'))
return response.status_code == 200
def update_endpoints(self):
"""
Refresh endpoints object used to determine path in queries
"""
self.endpoints = self.session.get(self.build_url('endpoints')).json()