import json
from soko_api.client import create_client, Client
import logging
import sys
# Configure logging to output to stdout
logging.basicConfig(
level=logging.INFO,
format='%(levelname)s: %(message)s',
stream=sys.stdout # Redirect logs to stdout
)
class SokoJob:
def __init__(self, args):
# Get Job arguments from aruments file
params_file = args[-1]
self.arguments = {}
with open(params_file) as f:
self.arguments = json.load(f)
self.version = "unknown"
self.label = None
self.group = None
self.icon = None
self.isForProject = False
self.isForFolder = False
self.isForVersion = False
self.isForMultipleObjects = False
self.askForConfirmation = False
self.suppressAskForFilesSelect = False
self.runLocal = False
self.widget = None
self.failureDetectionJobErrors = None
def Handle(self):
# Override this method with Job logic
...
def InitApi(self):
# Init Soko API
url: str = self.arguments["pluginData"]["serverURL"]
key: str = self.arguments["pluginData"]["apiKey"]
api_user = self.arguments["pluginData"].get("apiUser")
api_user_password = self.arguments["pluginData"].get("apiUserPassword")
jwt = self.arguments["pluginData"].get("jwt")
user_api_key = self.arguments["pluginData"].get("userApiKey")
if api_user:
self.soko_api: Client = create_client(url, key)
self.soko_api.sign_in_with_password(api_user, api_user_password)
elif user_api_key:
self.soko_api: Client = create_client(url, key)
self.soko_api.sign_in_with_api_key(user_api_key)
elif jwt:
self.soko_api: Client = create_client(url, key)
self.soko_api.sign_in_with_jwt(jwt)
else:
raise Exception("Failed to login into Soko API.")