From 714ae445330d9e2b3514d60de8c4db3707fdb888 Mon Sep 17 00:00:00 2001 From: Christopher Gerber Date: Wed, 13 Nov 2024 02:38:09 -0800 Subject: [PATCH] wip: context and threading, monitor mentions start and stop functional --- .../cdp_agentkit_core/actions/context.py | 127 ++++++++----- .../actions/social/twitter/__init__.py | 2 +- .../actions/social/twitter/context.py | 96 ++++++++-- .../twitter/mentions_monitor_details.py | 8 +- .../social/twitter/mentions_monitor_start.py | 170 ++++++++++++++---- .../social/twitter/mentions_monitor_stop.py | 25 ++- .../twitter_langchain/twitter_api_wrapper.py | 70 ++++++-- .../twitter_langchain/twitter_toolkit.py | 1 + 8 files changed, 381 insertions(+), 118 deletions(-) diff --git a/cdp-agentkit-core/cdp_agentkit_core/actions/context.py b/cdp-agentkit-core/cdp_agentkit_core/actions/context.py index 00aa116fa..d6789d49f 100644 --- a/cdp-agentkit-core/cdp_agentkit_core/actions/context.py +++ b/cdp-agentkit-core/cdp_agentkit_core/actions/context.py @@ -1,54 +1,83 @@ -from contextvars import ContextVar -from typing import Any +import contextvars +from typing import Any, Optional from pydantic import BaseModel, ConfigDict -class Context(BaseModel): - model_config = ConfigDict(arbitrary_types_allowed=True) - cvars: ContextVar = ContextVar('context_store', default={}) - +class Context(): + """Manages multiple context vars""" def __init__(self): - super().__init__() - - if not self.cvars.get(): - self.cvars.set({}) - - self._tokens: dict[str, Any] = {} - - def create_var(self, name: str, default=None) -> None: - store = self.cvars.get() - - if name not in store: - store[name] = ContextVar(name, default=default) - self.cvars.set(store) - - def get(self, name: str) -> Any: - store = self.cvars.get() - - if name not in store: - raise KeyError(f"Context variable '{name}' not found") - - return store[name].get() - - def set(self, name: str, value: Any) -> None: - store = self.cvars.get() - - if name not in store: - self.create_var(name) - store = self.cvars.get() - - self._tokens[name] = store[name].set(value) - - def reset(self, name: str = None) -> None: - if name is not None: - if name in self._tokens: - self._tokens[name].reset() - del self._tokens[name] - - else: - for token in self._tokens.values(): - token.reset() - - self._tokens.clear() - self.cvars.set({}) + self._vars: Dict[str, contextvars.ContextVar] = {} + + def create(self, key: str, default_value: any = None): + """Create a new context var with given key name""" + if key in self._vars: + raise ValueError(f"{key} already exists") + self._vars[key] = contextvars.ContextVar(f"{key}", default=default_value) + + def set(self, key: str, value: any): + """Set value for a key""" + if key not in self._vars: + self.create(key) + self._vars[key].set(value) + + def get(self, key: str) -> Optional[any]: + """Get current value for a key""" + if key not in self._vars: + # raise ValueError(f"{key} does not exist") + return None + return self._vars[key].get() + + def reset(self, key: str): + """Reset to its default value""" + if key not in self._vars: + raise ValueError(f"{key} does not exist") + self._vars[key].set(None) + + # model_config = ConfigDict(arbitrary_types_allowed=True) + # cvars: ContextVar = ContextVar('context_store', default={}) + + # def __init__(self): + # super(Context, self).__init__() + + # if not self.cvars.get(): + # self.cvars.set({}) + + # self._tokens: dict[str, Any] = {} + + # def create_var(self, name: str, default=None) -> None: + # store = self.cvars.get() + + # if name not in store: + # store[name] = ContextVar(name, default=default) + # self.cvars.set(store) + + # def get(self, name: str) -> Any: + # store = self.cvars.get() + + # if name not in store: + # raise KeyError(f"Context variable '{name}' not found") + + # return store[name].get() + + # def set(self, name: str, value: Any) -> None: + # store = self.cvars.get() + + # if name not in store: + # self.create_var(name) + # store = self.cvars.get() + + # self._tokens[name] = store[name].set(value) + + # def reset(self, name: str = None) -> None: + # if name is not None: + # if name in self._tokens: + # self._tokens[name].reset() + # del self._tokens[name] + + # else: + # for token in self._tokens.values(): + # token.reset() + + # self._tokens.clear() + # self.cvars.set({}) diff --git a/cdp-agentkit-core/cdp_agentkit_core/actions/social/twitter/__init__.py b/cdp-agentkit-core/cdp_agentkit_core/actions/social/twitter/__init__.py index c5b117b6f..9ac420423 100644 --- a/cdp-agentkit-core/cdp_agentkit_core/actions/social/twitter/__init__.py +++ b/cdp-agentkit-core/cdp_agentkit_core/actions/social/twitter/__init__.py @@ -1,5 +1,5 @@ from cdp_agentkit_core.actions.social.twitter.action import Action as TwitterAction -from cdp_agentkit_core.actions.social.twitter.context import Context as TwitterContext +from cdp_agentkit_core.actions.social.twitter.context import TwitterContext as TwitterContext from cdp_agentkit_core.actions.social.twitter.account_details import AccountDetailsAction from cdp_agentkit_core.actions.social.twitter.mentions_monitor_details import ( diff --git a/cdp-agentkit-core/cdp_agentkit_core/actions/social/twitter/context.py b/cdp-agentkit-core/cdp_agentkit_core/actions/social/twitter/context.py index 721055167..840ddfbe3 100644 --- a/cdp-agentkit-core/cdp_agentkit_core/actions/social/twitter/context.py +++ b/cdp-agentkit-core/cdp_agentkit_core/actions/social/twitter/context.py @@ -1,28 +1,94 @@ +# from contextvars import ContextVar + +# import tweepy + +# api: ContextVar[tweepy.API] = ContextVar("api", default=None) +# client: ContextVar[tweepy.Client] = ContextVar("client", default=None) + +# def get_api() -> tweepy.API: +# return api.get() + +# def set_api(value: tweepy.API): +# api.set(value) + +# def get_client() -> tweepy.Client: +# return client.get() + +# def set_client(value: tweepy.Client): +# client.set(value) + from contextvars import ContextVar +from contextlib import contextmanager import tweepy from cdp_agentkit_core.actions.context import Context -class Context(Context): - api: ContextVar[tweepy.API] | None = None - client: ContextVar[tweepy.Client] | None = None +class TwitterContext(Context): + mentions = ContextVar("mentions", default=None) - def __init__(self): - super().__init__() - self.api = ContextVar("api", default=None) - self.client = ContextVar("client", default=None) - - def get_api(self) -> tweepy.API: - return self.api.get() - - def set_api(self, value: tweepy.API): - self.api.set(value) + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + self._client = ContextVar("client", default=None) + # self.mentions = ContextVar("mentions", default=None) def get_client(self) -> tweepy.Client: - return self.client.get() + return self._client.get() def set_client(self, value: tweepy.Client): - self.client.set(value) + self._client.set(value) + +# context: TwitterContext = TwitterContext() + +# def current() -> TwitterContext: +# inst = _context.get() +# if inst is None: +# raise Runtimeerror("TwitterContext not found") + +# return inst + +# def new() -> TwitterContext: + + +# def get_context() -> TwitterContext: +# return _context.get() + +# def set_context(ctx:TwitterContext): +# _context.set(ctx) + +def unwrap() ->TwitterContext: + return _context.get() + +def get_client() -> tweepy.Client: + return _context.get().get_client() + +def set_client(client: tweepy.Client): + _context.get().set_client(client) + + +@contextmanager +def current(): + ctx = _context.get() + + if ctx is None: + raise Runtimeerror("TwitterContext not found") + + try: + yield ctx + finally: + pass + +@contextmanager +def new(): + ctx = TwitterContext() + token = _context.set(ctx) + try: + yield ctx + finally: + _context.reset(token) + +_context = ContextVar('twitter_context', default=None) +_context.set(TwitterContext()) +thread = ContextVar('twitter_threads', default=None) diff --git a/cdp-agentkit-core/cdp_agentkit_core/actions/social/twitter/mentions_monitor_details.py b/cdp-agentkit-core/cdp_agentkit_core/actions/social/twitter/mentions_monitor_details.py index 670b7221a..2e5a3e1d7 100644 --- a/cdp-agentkit-core/cdp_agentkit_core/actions/social/twitter/mentions_monitor_details.py +++ b/cdp-agentkit-core/cdp_agentkit_core/actions/social/twitter/mentions_monitor_details.py @@ -3,14 +3,16 @@ from pydantic import BaseModel from cdp_agentkit_core.actions.social.twitter import TwitterAction, TwitterContext +from cdp_agentkit_core.actions.social.twitter.mentions import items -MENTIONS_MONITOR_DETAILS_PROMPT = """ -""" +MENTIONS_MONITOR_DETAILS_PROMPT = """show details for mentions.""" class MentionsMonitorDetailsInput(BaseModel): pass -def mentions_monitor_details(context: TwitterContext) -> str: +def mentions_monitor_details() -> str: + print(f"size: {items} {items.qsize()}") + return f"size: {items} {items.qsize()}" pass class MentionsMonitorDetailsAction(TwitterAction): diff --git a/cdp-agentkit-core/cdp_agentkit_core/actions/social/twitter/mentions_monitor_start.py b/cdp-agentkit-core/cdp_agentkit_core/actions/social/twitter/mentions_monitor_start.py index 6ad5504fd..aad4c7555 100644 --- a/cdp-agentkit-core/cdp_agentkit_core/actions/social/twitter/mentions_monitor_start.py +++ b/cdp-agentkit-core/cdp_agentkit_core/actions/social/twitter/mentions_monitor_start.py @@ -1,10 +1,17 @@ from collections.abc import Callable +from queue import Queue +import contextvars +import time +import threading -import asyncio +# import asyncio import tweepy + from pydantic import BaseModel from cdp_agentkit_core.actions.social.twitter import TwitterAction, TwitterContext +import cdp_agentkit_core.actions.social.twitter.context as context +from cdp_agentkit_core.actions.social.twitter.mentions import items MENTIONS_MONITOR_START_PROMPT = """ This tool will monitor mentions for the currently authenticated Twitter (X) user context.""" @@ -14,47 +21,144 @@ class MentionsMonitorStartInput(BaseModel): pass +def mentions_monitor_start() -> str: + global trd + # try: + # state = context.get("mentions-state") + # if state == "running": + # return "already running" + # except KeyError as e: + # pass + + # print(context.get_client()) + + # context.client.set(twitterContext.get_client()) + + ctx = contextvars.copy_context() + thread = MonitorMentionsThread(ctx) + context.unwrap().mentions.set(thread) + # context.unwrap().set("test", thread) + context.thread.set(thread) + set_thread(thread) + # thread._ctx = ctx + threads.put(thread) + thread.start() + + print("...") + # print(context.unwrap()) + # print(context.unwrap().mentions.get()) + # print(context.unwrap().get("test")) + # print(context.thread.get()) + print(get_thread()) + + + # threadContext = contextvars.copy_context() + # threadContext.run( + # thread = threading.Thread(target=monitor_mentions, args=(context, )) + # thread.start() + # context.set("mentions-state", "running") + -def mentions_monitor_start(context: TwitterContext) -> str: - try: - state = context.get("mentions-state") - if state == "running": - return "already running" - except KeyError as e: - pass + # asyncio.set_event_loop(loop) + # task = asyncio.create_task(monitor_mentions(context)) + # task = asyncio.ensure_future(monitor_mentions(context)) + # context.set("mentions-task", task) - context.set("mentions-state", "running") - task = asyncio.create_task(monitor_mentions(context)) - context.set("mentions-task", task) + # asyncio.to_thread(monitor_mentions) return "started monitoring" -def monitor_mentions(context: TwitterContext): - last_id = 0 +class MonitorMentionsThread(threading.Thread): + running: bool - - try: - response = context.get_client().get_me() - me = response.data - except tweepy.errors.TweepyException as e: - return f"Error retrieving authenticated user account details: {e}" - - while context.get("mentions-state") == "running": - try: - # mentions = context.get_api().mentions_timeline(since_id=last_id) - mentions = context.get_client().get_users_mentions(me.id, since_id=last_id) - for mention in mentions: - print(f"@{mention.user.screen_name}: {mention.text}") - last_id = mention.id - - except tweepy.errors.TweepyException as e: - print(f"Error: {e}") - return - - asyncio.sleep(60) + def __init__(self, ctx, *args, **kwargs): + super().__init__(*args, **kwargs) + self._ctx = ctx + # self._ctx = context._context + + + # print("inbound") + # print(.get_client()) + + # self.context = contextvars.copy_context() + self.mention_id = 0 + + def run(self): + for var, value in self._ctx.items(): + var.set(value) + + print("CONTEXT") + # print(self._ctx.get_client()) + print(context.get_client()) + + items.put("hi") + # return + # try: + # state = context.get("mentions-monitor-state") + # if state == "running": + # return + # except KeyError as e: + # pass + + # context.set("mentions-monitor-state", "") + self.running = True + self.monitor() + # context.set("mentions-monitor-state", "running") + + def stop(self): + self.running = False + + def monitor(self): + + # try: + # response = context.get_client().get_me() + # me = response.data + # print(me.id) + # except tweepy.errors.TweepyException as e: + # self.running = False + # return f"Error retrieving authenticated user account details: {e}" + + while self.running: + time.sleep(1) + # while True: + # state = context.get("mentions-monitor-state") + # if state == "stopped": + # break + + #context.get("mentions-state") != "stopped": + # try: + # print("fetching mentions") + + # response = context.get_client().get_users_mentions(me.id, since_id=self.mention_id) + # mentions = response.data + + # for mention in mentions: + # if mention is None: + # print("mention is empty") + # continue + + # print(f"@{mention.user.screen_name}: {mention.text}") + # self.mention_id = mention.id + + # except tweepy.errors.TweepyException as e: + # print(f"Error: {e}") + + # time.sleep(15 * 60) + + print("monitoring stopped") class MentionsMonitorStartAction(TwitterAction): name: str = "mentions_monitor_start" description: str = MENTIONS_MONITOR_START_PROMPT args_schema: type[BaseModel] | None = MentionsMonitorStartInput func: Callable[..., str] = mentions_monitor_start + +def get_thread() -> MonitorMentionsThread: + return _thread.get() + +def set_thread(t:MonitorMentionsThread): + _thread.set(t) + +_thread: contextvars.ContextVar[MonitorMentionsThread] = contextvars.ContextVar('monitor-mentions-thread', default=None) + +threads = Queue() diff --git a/cdp-agentkit-core/cdp_agentkit_core/actions/social/twitter/mentions_monitor_stop.py b/cdp-agentkit-core/cdp_agentkit_core/actions/social/twitter/mentions_monitor_stop.py index 5aa24487b..7183270d9 100644 --- a/cdp-agentkit-core/cdp_agentkit_core/actions/social/twitter/mentions_monitor_stop.py +++ b/cdp-agentkit-core/cdp_agentkit_core/actions/social/twitter/mentions_monitor_stop.py @@ -3,15 +3,36 @@ from pydantic import BaseModel from cdp_agentkit_core.actions.social.twitter import TwitterAction, TwitterContext +import cdp_agentkit_core.actions.social.twitter.context as context +from cdp_agentkit_core.actions.social.twitter.mentions_monitor_start import threads MENTIONS_MONITOR_STOP_PROMPT = """ +Stop monitoring twitter mentions. """ class MentionsMonitorStopInput(BaseModel): pass -def mentions_monitor_stop(context: TwitterContext) -> str: - pass +def mentions_monitor_stop() -> str: + print("hmm?") + # print(context.unwrap().get("test")) + # print(context.unwrap().mentions.get()) + # print(context.thread.get()) + + # print(get_thread()) + # print(get_thread().running) + # get_thread().stop() + # print(get_thread().running) + + print(threads.qsize()) + thread = threads.get() + print(thread.running) + thread.stop() + print(thread.running) + + # context.set("mentions-monitor-state", "stopped") + + return "stopping monitoring..." class MentionsMonitorStopAction(TwitterAction): name: str = "mentions_monitor_stop" diff --git a/twitter-langchain/twitter_langchain/twitter_api_wrapper.py b/twitter-langchain/twitter_langchain/twitter_api_wrapper.py index 5ca088f97..434958cf5 100644 --- a/twitter-langchain/twitter_langchain/twitter_api_wrapper.py +++ b/twitter-langchain/twitter_langchain/twitter_api_wrapper.py @@ -8,7 +8,10 @@ # from contextvars_registry import ContextVarsRegistry from cdp_agentkit_core.actions.social.twitter import TwitterContext from langchain_core.utils import get_from_dict_or_env -from pydantic import BaseModel, model_validator +from pydantic import BaseModel, Field, model_validator + +import cdp_agentkit_core.actions.social.twitter.context as context +from cdp_agentkit_core.actions.social.twitter.mentions_monitor_start import get_thread # class TwitterContext(ContextVarsRegistry): # client: tweepy.Client | None = None @@ -16,7 +19,8 @@ class TwitterApiWrapper(BaseModel): """Wrapper for Twitter API.""" - context: TwitterContext | None = None + # twitterContext: TwitterContext | None = None + # client:tweepy.Client = Field(..., description="twitter client") @model_validator(mode="before") @classmethod @@ -26,6 +30,7 @@ def validate_environment(cls, values: dict) -> Any: api_secret = get_from_dict_or_env(values, "twitter_api_secret", "TWITTER_API_SECRET") access_token = get_from_dict_or_env(values, "twitter_access_token", "TWITTER_ACCESS_TOKEN") access_token_secret = get_from_dict_or_env(values, "twitter_access_token_secret", "TWITTER_ACCESS_TOKEN_SECRET") + bearer_token = get_from_dict_or_env(values, "twitter_bearer_token", "TWITTER_BEARER_TOKEN") try: import tweepy @@ -34,29 +39,42 @@ def validate_environment(cls, values: dict) -> Any: "Tweepy Twitter SDK is not installed. " "Please install it with `pip install tweepy`" ) from None - api_auth = tweepy.OAuth1UserHandler( - api_key, - api_secret, - access_token, - access_token_secret, - ) + # api_auth = tweepy.OAuth1UserHandler( + # api_key, + # api_secret, + # access_token, + # access_token_secret, + # ) - api = tweepy.API(api_auth) + # api = tweepy.API(api_auth) client = tweepy.Client( + bearer_token=bearer_token, consumer_key=api_key, consumer_secret=api_secret, access_token=access_token, access_token_secret=access_token_secret, ) - context = TwitterContext() - context.set_api(api) + # ctx = context.get_context() + # ctx.set_client(client) + + # context.set_context(ctx) + context.set_client(client) - values["context"] = context - values["api"] = context.api - values["client"] = context.client + # with context.context() as ctx: + # ctx.set_client(client) + + # twitterContext = TwitterContext() + # context.set_api(api) + # twitterContext.set_client(client) + + # context.client.set(client) + + # values["twitterContext"] = twitterContext + # values["api"] = api + values["client"] = client values["api_key"] = api_key values["api_secret"] = api_secret values["access_token"] = access_token @@ -71,7 +89,29 @@ def run_action(self, func: Callable[..., str], **kwargs) -> str: # func_signature = inspect.signature(func) # first_kwarg = next(iter(func_signature.parameters.values()), None) - return func(self.context, **kwargs) + response = "" + + # with context.context() as ctx: + # print("client") + # print(ctx.get_client()) + + # ctx.set_client(self.client) + + # ctx = contextvars.copy_context() + # for var, value in ctx.items(): + # var.set(value) + + print("client") + print(context.get_client()) + print(context.unwrap()) + print(get_thread()) + + if context.unwrap() is not None: + print("yay?") + print(context.unwrap().mentions.get()) + func(**kwargs) + + return response # if first_kwarg and first_kwarg.annotation is tweepy.Client: # return func(self.client, **kwargs) diff --git a/twitter-langchain/twitter_langchain/twitter_toolkit.py b/twitter-langchain/twitter_langchain/twitter_toolkit.py index 6837c68fb..93577e7d5 100644 --- a/twitter-langchain/twitter_langchain/twitter_toolkit.py +++ b/twitter-langchain/twitter_langchain/twitter_toolkit.py @@ -118,6 +118,7 @@ def from_twitter_api_wrapper(cls, twitter_api_wrapper: TwitterApiWrapper) -> "Tw """ actions = TWITTER_ACTIONS + print(actions) tools = [ TwitterTool(