Skip to content

Commit

Permalink
wip: context and threading, monitor mentions start and stop functional
Browse files Browse the repository at this point in the history
  • Loading branch information
stat committed Nov 13, 2024
1 parent 7408608 commit 714ae44
Show file tree
Hide file tree
Showing 8 changed files with 381 additions and 118 deletions.
127 changes: 78 additions & 49 deletions cdp-agentkit-core/cdp_agentkit_core/actions/context.py
Original file line number Diff line number Diff line change
@@ -1,54 +1,83 @@
from contextvars import ContextVar
from typing import Any
import contextvars
from typing import Any, Optional

from pydantic import BaseModel, ConfigDict


class Context(BaseModel):
model_config = ConfigDict(arbitrary_types_allowed=True)
cvars: ContextVar = ContextVar('context_store', default={})

class Context():
"""Manages multiple context vars"""
def __init__(self):
super().__init__()

if not self.cvars.get():
self.cvars.set({})

self._tokens: dict[str, Any] = {}

def create_var(self, name: str, default=None) -> None:
store = self.cvars.get()

if name not in store:
store[name] = ContextVar(name, default=default)
self.cvars.set(store)

def get(self, name: str) -> Any:
store = self.cvars.get()

if name not in store:
raise KeyError(f"Context variable '{name}' not found")

return store[name].get()

def set(self, name: str, value: Any) -> None:
store = self.cvars.get()

if name not in store:
self.create_var(name)
store = self.cvars.get()

self._tokens[name] = store[name].set(value)

def reset(self, name: str = None) -> None:
if name is not None:
if name in self._tokens:
self._tokens[name].reset()
del self._tokens[name]

else:
for token in self._tokens.values():
token.reset()

self._tokens.clear()
self.cvars.set({})
self._vars: Dict[str, contextvars.ContextVar] = {}

def create(self, key: str, default_value: any = None):
"""Create a new context var with given key name"""
if key in self._vars:
raise ValueError(f"{key} already exists")
self._vars[key] = contextvars.ContextVar(f"{key}", default=default_value)

def set(self, key: str, value: any):
"""Set value for a key"""
if key not in self._vars:
self.create(key)
self._vars[key].set(value)

def get(self, key: str) -> Optional[any]:
"""Get current value for a key"""
if key not in self._vars:
# raise ValueError(f"{key} does not exist")
return None
return self._vars[key].get()

def reset(self, key: str):
"""Reset to its default value"""
if key not in self._vars:
raise ValueError(f"{key} does not exist")
self._vars[key].set(None)

# model_config = ConfigDict(arbitrary_types_allowed=True)
# cvars: ContextVar = ContextVar('context_store', default={})

# def __init__(self):
# super(Context, self).__init__()

# if not self.cvars.get():
# self.cvars.set({})

# self._tokens: dict[str, Any] = {}

# def create_var(self, name: str, default=None) -> None:
# store = self.cvars.get()

# if name not in store:
# store[name] = ContextVar(name, default=default)
# self.cvars.set(store)

# def get(self, name: str) -> Any:
# store = self.cvars.get()

# if name not in store:
# raise KeyError(f"Context variable '{name}' not found")

# return store[name].get()

# def set(self, name: str, value: Any) -> None:
# store = self.cvars.get()

# if name not in store:
# self.create_var(name)
# store = self.cvars.get()

# self._tokens[name] = store[name].set(value)

# def reset(self, name: str = None) -> None:
# if name is not None:
# if name in self._tokens:
# self._tokens[name].reset()
# del self._tokens[name]

# else:
# for token in self._tokens.values():
# token.reset()

# self._tokens.clear()
# self.cvars.set({})
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
from cdp_agentkit_core.actions.social.twitter.action import Action as TwitterAction
from cdp_agentkit_core.actions.social.twitter.context import Context as TwitterContext
from cdp_agentkit_core.actions.social.twitter.context import TwitterContext as TwitterContext

from cdp_agentkit_core.actions.social.twitter.account_details import AccountDetailsAction
from cdp_agentkit_core.actions.social.twitter.mentions_monitor_details import (
Expand Down
Original file line number Diff line number Diff line change
@@ -1,28 +1,94 @@
# from contextvars import ContextVar

# import tweepy

# api: ContextVar[tweepy.API] = ContextVar("api", default=None)
# client: ContextVar[tweepy.Client] = ContextVar("client", default=None)

# def get_api() -> tweepy.API:
# return api.get()

# def set_api(value: tweepy.API):
# api.set(value)

# def get_client() -> tweepy.Client:
# return client.get()

# def set_client(value: tweepy.Client):
# client.set(value)

from contextvars import ContextVar
from contextlib import contextmanager

import tweepy

from cdp_agentkit_core.actions.context import Context


class Context(Context):
api: ContextVar[tweepy.API] | None = None
client: ContextVar[tweepy.Client] | None = None
class TwitterContext(Context):
mentions = ContextVar("mentions", default=None)

def __init__(self):
super().__init__()
self.api = ContextVar("api", default=None)
self.client = ContextVar("client", default=None)

def get_api(self) -> tweepy.API:
return self.api.get()

def set_api(self, value: tweepy.API):
self.api.set(value)
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self._client = ContextVar("client", default=None)
# self.mentions = ContextVar("mentions", default=None)

def get_client(self) -> tweepy.Client:
return self.client.get()
return self._client.get()

def set_client(self, value: tweepy.Client):
self.client.set(value)
self._client.set(value)

# context: TwitterContext = TwitterContext()

# def current() -> TwitterContext:
# inst = _context.get()
# if inst is None:
# raise Runtimeerror("TwitterContext not found")

# return inst

# def new() -> TwitterContext:


# def get_context() -> TwitterContext:
# return _context.get()

# def set_context(ctx:TwitterContext):
# _context.set(ctx)

def unwrap() ->TwitterContext:
return _context.get()

def get_client() -> tweepy.Client:
return _context.get().get_client()

def set_client(client: tweepy.Client):
_context.get().set_client(client)


@contextmanager
def current():
ctx = _context.get()

if ctx is None:
raise Runtimeerror("TwitterContext not found")

try:
yield ctx
finally:
pass

@contextmanager
def new():
ctx = TwitterContext()
token = _context.set(ctx)
try:
yield ctx
finally:
_context.reset(token)

_context = ContextVar('twitter_context', default=None)
_context.set(TwitterContext())

thread = ContextVar('twitter_threads', default=None)
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,16 @@
from pydantic import BaseModel

from cdp_agentkit_core.actions.social.twitter import TwitterAction, TwitterContext
from cdp_agentkit_core.actions.social.twitter.mentions import items

MENTIONS_MONITOR_DETAILS_PROMPT = """
"""
MENTIONS_MONITOR_DETAILS_PROMPT = """show details for mentions."""

class MentionsMonitorDetailsInput(BaseModel):
pass

def mentions_monitor_details(context: TwitterContext) -> str:
def mentions_monitor_details() -> str:
print(f"size: {items} {items.qsize()}")
return f"size: {items} {items.qsize()}"
pass

class MentionsMonitorDetailsAction(TwitterAction):
Expand Down
Loading

0 comments on commit 714ae44

Please sign in to comment.