Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature GenAI #43

Open
wants to merge 11 commits into
base: master
Choose a base branch
from
91 changes: 89 additions & 2 deletions pyalgotrading/algobulls/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,16 +9,20 @@
import requests

from .exceptions import AlgoBullsAPIBaseException, AlgoBullsAPIUnauthorizedErrorException, AlgoBullsAPIInsufficientBalanceErrorException, AlgoBullsAPIResourceNotFoundErrorException, AlgoBullsAPIBadRequestException, \
AlgoBullsAPIInternalServerErrorException, AlgoBullsAPIForbiddenErrorException, AlgoBullsAPIGatewayTimeoutErrorException
AlgoBullsAPIInternalServerErrorException, AlgoBullsAPIForbiddenErrorException, AlgoBullsAPIGatewayTimeoutErrorException, AlgoBullsAPITooManyRequestsException
from ..constants import TradingType, TradingReportType
from ..utils.func import get_raw_response

GENAI_SESSION_SIZE = 100
GENAI_SESSION_HISTORY_SIZE = 100


class AlgoBullsAPI:
"""
AlgoBulls API
"""
SERVER_ENDPOINT = 'https://api.algobulls.com/'
SERVER_ENDPOINT = 'http://localhost:7003/'
# SERVER_ENDPOINT = 'https://api.algobulls.com/'

def __init__(self, connection):
"""
Expand All @@ -30,6 +34,8 @@ def __init__(self, connection):
self.__key_papertrading = {} # strategy-cstc_id mapping
self.__key_realtrading = {} # strategy-cstc_id mapping
self.pattern = re.compile(r'(?<!^)(?=[A-Z])')
self.genai_session_id = None
self.genai_sessions_map = None

def __convert(self, _dict):
# Helps convert _dict keys from camelcase to snakecase
Expand Down Expand Up @@ -91,6 +97,9 @@ def _send_request(self, method: str = 'get', endpoint: str = '', base_url: str =
elif r.status_code == 404:
r.raw.decode_content = True
raise AlgoBullsAPIResourceNotFoundErrorException(method=method, url=url, response=get_raw_response(r), status_code=404)
elif r.status_code == 429:
r.raw.decode_content = True
raise AlgoBullsAPITooManyRequestsException(method=method, url=url, response=get_raw_response(r), status_code=429)
elif r.status_code == 500:
r.raw.decode_content = True
raise AlgoBullsAPIInternalServerErrorException(method=method, url=url, response=get_raw_response(r), status_code=500)
Expand Down Expand Up @@ -473,3 +482,81 @@ def get_reports(self, strategy_code: str, trading_type: TradingType, report_type
response = self._send_request(endpoint=endpoint, params=params)

return response

def set_genai_api_key(self, genai_api_key):
om-khade-algobulls marked this conversation as resolved.
Show resolved Hide resolved
endpoint = 'v1/build/python/genai/key'
json_data = {"openaiApiKey": genai_api_key}
response = self._send_request(method='post', endpoint=endpoint, json_data=json_data)
return response

def get_genai_api_key_status(self):
om-khade-algobulls marked this conversation as resolved.
Show resolved Hide resolved
endpoint = f'v1/build/python/genai/key'
response = self._send_request(endpoint=endpoint)
return response

def get_genai_response(self, user_prompt: str, chat_gpt_model: str = ''):
"""
Fetch GenAI response.

Args:
user_prompt: User question
chat_gpt_model: Chat gpt model name
Returns:
GenAI response

Info: ENDPOINT
`GET` v1/build/python/genai Get GenAI response
"""
endpoint = 'v1/build/python/genai'
params = {"userPrompt": user_prompt, 'sessionId': self.genai_session_id, 'chatGPTModel': chat_gpt_model}

try:
response = self._send_request(endpoint=endpoint, params=params)
if self.genai_session_id is None and 'session_id' in response:
self.genai_session_id = response['session_id']
except (AlgoBullsAPIResourceNotFoundErrorException, AlgoBullsAPIForbiddenErrorException, AlgoBullsAPIBadRequestException, AlgoBullsAPITooManyRequestsException) as ex:
print('\nFail.')
print(f'{ex.get_error_type()}: {ex.response}')
response = None

return response

def handle_genai_response_timeout(self):
"""
Fetch GenAI response.

Args:
Returns:
GenAI response for current session. Last active session is used when session_id is None.

Info: ENDPOINT
`GET` v1/build/python/genai/response Pooling API to get response in case of timeout
"""
endpoint = 'v1/build/python/genai/response'
params = {'session_id': self.genai_session_id}

try:
response = self._send_request(endpoint=endpoint, params=params)
if self.genai_session_id is None and 'session_id' in response:
self.genai_session_id = response['session_id']
except AlgoBullsAPIResourceNotFoundErrorException as ex:
print('\nFail.')
print(f'{ex.get_error_type()}: {ex.response}')
response = None

return response

def get_genai_sessions(self):
endpoint = 'v1/build/python/genai/sessions'
om-khade-algobulls marked this conversation as resolved.
Show resolved Hide resolved
params = {'session_id': self.genai_session_id, 'pageSize': GENAI_SESSION_SIZE}
response = self._send_request(endpoint=endpoint, params=params)
self.genai_sessions_map = response['data']

return response['data']

def get_genai_session_history(self, session_id):
endpoint = 'v1/build/python/genai/session/history'
om-khade-algobulls marked this conversation as resolved.
Show resolved Hide resolved
params = {'sessionId': self.genai_sessions_map[session_id - 1]['id'], 'pageSize': GENAI_SESSION_HISTORY_SIZE}
response = self._send_request(endpoint=endpoint, params=params)

return response['data']
152 changes: 143 additions & 9 deletions pyalgotrading/algobulls/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,13 @@
from tqdm.auto import tqdm

from .api import AlgoBullsAPI
from .exceptions import AlgoBullsAPIBadRequestException, AlgoBullsAPIGatewayTimeoutErrorException
from .exceptions import AlgoBullsAPIBadRequestException, AlgoBullsAPIGatewayTimeoutErrorException, AlgoBullsAPIForbiddenErrorException
from ..constants import StrategyMode, TradingType, TradingReportType, CandleInterval, AlgoBullsEngineVersion, Country, ExecutionStatus, EXCHANGE_LOCALE_MAP, Locale
from ..strategy.strategy_base import StrategyBase
from ..utils.func import get_valid_enum_names, get_datetime_with_tz

GENAI_RESPONSE_POOLING_LIMIT = 20


class AlgoBullsConnection:
"""
Expand All @@ -31,10 +33,7 @@ def __init__(self):
"""
self.api = AlgoBullsAPI(self)

self.saved_parameters = {
'start_timestamp_map': {},
'end_timestamp_map': {}
}
self.saved_parameters = {"start_timestamp_map": {}, "end_timestamp_map": {}}
om-khade-algobulls marked this conversation as resolved.
Show resolved Hide resolved

self.strategy_country_map = {
TradingType.BACKTESTING: {},
Expand All @@ -45,6 +44,7 @@ def __init__(self):
self.backtesting_pnl_data = None
self.papertrade_pnl_data = None
self.realtrade_pnl_data = None
self.recent_genai_response = None

@staticmethod
def get_authorization_url():
Expand All @@ -54,8 +54,8 @@ def get_authorization_url():
Returns:
Authorization URL
"""
url = 'https://app.algobulls.com/user/login'
print(f'Please login to this URL with your AlgoBulls credentials and get your developer access token: {url}')
url = "https://app.algobulls.com/user/login"
print(f"Please login to this URL with your AlgoBulls credentials and get your developer access token: {url}")

@staticmethod
def get_token_url():
Expand All @@ -65,8 +65,8 @@ def get_token_url():
Returns:
Token URL
"""
url = 'https://app.algobulls.com/settings?section=developerOptions'
print(f'Please login to this URL to get your unique token: {url}')
url = "https://app.algobulls.com/settings?section=developerOptions"
print(f"Please login to this URL to get your unique token: {url}")

def set_access_token(self, access_token):
"""
Expand All @@ -81,6 +81,140 @@ def set_access_token(self, access_token):
assert isinstance(access_token, str), f'Argument "access_token" should be a string'
self.api.set_access_token(access_token)

def set_generative_ai_keys(self, genai_api_key):
"""
Set the API keys of the generative AI took used

Args:
genai_api_key: GenAI API key
"""
assert isinstance(genai_api_key, str), f'Argument "api_key" should be a string'
self.api.set_genai_api_key(genai_api_key)

def get_genai_response_pooling(self, no_of_tries, user_prompt=None, chat_gpt_model=None):
om-khade-algobulls marked this conversation as resolved.
Show resolved Hide resolved
if no_of_tries < GENAI_RESPONSE_POOLING_LIMIT:
try:
if no_of_tries > 1:
response = self.api.handle_genai_response_timeout()
else:
response = self.api.get_genai_response(user_prompt, chat_gpt_model)

except AlgoBullsAPIGatewayTimeoutErrorException:
response = self.get_genai_response_pooling(no_of_tries + 1)
else:
response = {"message": "Somthing went wrong please try again"}
return response

def display_genai_sessions(self):
"""
display previous sessions
Returns:
available sessions
"""
customer_genai_sessions = self.api.get_genai_sessions()
df = pd.DataFrame(customer_genai_sessions)
df.index += 1

if customer_genai_sessions:
df.drop(columns=["id"], inplace=True)
print(tabulate(df, headers=["id", "Timestamp Created", "Title"], tablefmt="pretty"))

return customer_genai_sessions

def continue_from_previous_sessions(self):
customer_genai_sessions = self.display_genai_sessions()
om-khade-algobulls marked this conversation as resolved.
Show resolved Hide resolved
while True:
user_input = int(input("Enter session number"))
if not isinstance(user_input, int):
print('Argument "user_input" should be a int')
elif 1 <= int(user_input) <= len(customer_genai_sessions):
selected_session_index = int(user_input) - 1
selected_session_id = customer_genai_sessions[selected_session_index][0]
self.api.genai_session_id = selected_session_id
break
else:
print("Please select a valid session number.")

def display_session_chat_history(self, session_id):
if not self.api.genai_sessions_map:
om-khade-algobulls marked this conversation as resolved.
Show resolved Hide resolved
self.api.get_genai_sessions()

customer_genai_session_history = self.api.get_genai_session_history(session_id)
if customer_genai_session_history:
for chat in customer_genai_session_history[::-1]:
print(f"User:\n{chat['user_prompt']}", end="\n\n")
print(f"GenAI:\n{chat['genai_response']}", end=f"\n\n{'-' * 50}\n\n")
else:
print(f"No available chat history for session id: {session_id}")

def start_chat(self, start_fresh=None, session_id=None, chat_gpt_model=None):
om-khade-algobulls marked this conversation as resolved.
Show resolved Hide resolved
response = self.api.get_genai_api_key_status()
assert response['key_available'], f"Please set your GenAI key using set_generative_ai_keys()"

# This will reset the session_id
if start_fresh:
# reset session
self.api.genai_session_id = None
elif start_fresh is not None:
if session_id:
if self.api.genai_sessions_map is not None:
self.api.get_genai_sessions()
assert session_id in self.api.genai_sessions_map, f"Please selecta valid session id."
self.api.genai_session_id = self.api.genai_sessions_map[session_id - 1][0]
else:
self.continue_from_previous_sessions()

print("Session Start", end="\n\n")
while True:
print("Enter 'Save' to save the Strategy. Enter 'Exit' to exit the session.", end="\r")
user_prompt = str(input("Enter query: ")).lower()
if user_prompt:
if user_prompt == "exit":
print("Session End")
return

if user_prompt == 'save':
strategy_name = str(input("Input a Strategy Name"))
return self.save_last_generated_strategy(strategy_name=strategy_name)
return

print("Please wait your request is being precessed.", end='\r')
response = self.get_genai_response_pooling(1, user_prompt, chat_gpt_model)
if not response:
break

self.recent_genai_response = response['message']
print(f"\nGenAI: {response['message']}", end=f"\n\n{'-' * 50}\n\n")

def save_last_generated_strategy(self, strategy_name=None, strategy_code=None):
om-khade-algobulls marked this conversation as resolved.
Show resolved Hide resolved
if self.recent_genai_response or strategy_code:
strategy_name = strategy_name or f'GenAI Strategy-{time.time():.0f}'
strategy_details = strategy_code or self.recent_genai_response

pattern = r"```python\n(.*?)\n```\n"
code_matches = re.findall(pattern, strategy_details, re.DOTALL)

if not code_matches:
print(strategy_details)
print("Do you want to save the following strategy? (Yes/No)")

while True:
user_response = input().lower()
if user_response == 'yes':
break
elif user_response == 'no':
return
else:
strategy_details = code_matches[0]

response = self.api.create_strategy(strategy_name=strategy_name, strategy_details=strategy_details, abc_version='3.3.3')
if response:
self.recent_genai_response = None
return response

else:
print("Warning: Please generate a strategy using GenAI before saving.")

def create_strategy(self, strategy, overwrite=False, strategy_code=None, abc_version=None):
"""
Method to upload new strategy.
Expand Down
9 changes: 9 additions & 0 deletions pyalgotrading/algobulls/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,15 @@ def get_error_type(self):
return 'Resource Not Found'


class AlgoBullsAPITooManyRequestsException(AlgoBullsAPIBaseException):
"""
Exception class for HTTP status code of 429 (Too Many Requests)
"""

def get_error_type(self):
return 'Too Many Requests'


class AlgoBullsAPIInternalServerErrorException(AlgoBullsAPIBaseException):
"""
Exception class for HTTP status code of 500 (Internal Server Error)
Expand Down