Skip to content
This repository has been archived by the owner on Apr 29, 2024. It is now read-only.

Commit

Permalink
Added score modifiers (#20)
Browse files Browse the repository at this point in the history
  • Loading branch information
spietras authored Oct 18, 2022
1 parent ebdf53c commit 0d18ff7
Show file tree
Hide file tree
Showing 4 changed files with 217 additions and 5 deletions.
83 changes: 78 additions & 5 deletions kilroy_face_twitter/src/kilroy_face_twitter/face.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,14 @@
classproperty,
normalize,
)
from kilroy_server_py_utils import CategorizableBasedOptionalParameter
from tweepy import Tweet

from kilroy_face_twitter.client import TwitterClient
from kilroy_face_twitter.models import TweetFields, TweetIncludes
from kilroy_face_twitter.processors import Processor
from kilroy_face_twitter.scorers import Scorer
from kilroy_face_twitter.scoring.modifiers import ScoreModifier
from kilroy_face_twitter.scoring.raw import Scorer
from kilroy_face_twitter.scrapers import Scraper

logger = logging.getLogger(__name__)
Expand All @@ -37,9 +39,11 @@ class Params(SerializableModel):
consumer_secret: str
access_token: str
access_token_secret: str
scoring_type: str
scoring_type: str = "likes"
scorers_params: Dict[str, Dict[str, Any]] = {}
scraping_type: str
score_modifier_type: Optional[str] = None
score_modifiers_params: Dict[str, Any] = {}
scraping_type: str = "timeline"
scrapers_params: Dict[str, Dict[str, Any]] = {}


Expand All @@ -48,6 +52,8 @@ class State:
processor: Processor
scorer: Scorer
scorers_params: Dict[str, Dict[str, Any]]
score_modifier: Optional[ScoreModifier]
score_modifiers_params: Dict[str, Dict[str, Any]]
scraper: Scraper
scrapers_params: Dict[str, Dict[str, Any]]
client: TwitterClient
Expand All @@ -58,6 +64,13 @@ async def _get_params(self, state: State, category: str) -> Dict[str, Any]:
return {**state.scorers_params.get(category, {})}


class ScoreModifierParameter(
CategorizableBasedOptionalParameter[State, ScoreModifier]
):
async def _get_params(self, state: State, category: str) -> Dict[str, Any]:
return {**state.score_modifiers_params.get(category, {})}


class ScraperParameter(CategorizableBasedParameter[State, Scraper]):
async def _get_params(self, state: State, category: str) -> Dict[str, Any]:
return {**state.scrapers_params.get(category, {})}
Expand Down Expand Up @@ -87,6 +100,7 @@ def post_schema(cls) -> JSONSchema:
def parameters(cls) -> Set[Parameter]:
return {
ScorerParameter(),
ScoreModifierParameter(),
ScraperParameter(),
}

Expand All @@ -102,6 +116,20 @@ async def _build_scorer(cls, params: Params) -> Scorer:
**params.scorers_params.get(params.scoring_type, {}),
)

@classmethod
async def _build_score_modifier(
cls, params: Params
) -> Optional[ScoreModifier]:
if params.score_modifier_type is None:
return None
return await cls._build_generic(
ScoreModifier,
category=params.score_modifier_type,
**params.score_modifiers_params.get(
params.score_modifier_type, {}
),
)

@classmethod
async def _build_scraper(cls, params: Params) -> Scraper:
return await cls._build_generic(
Expand All @@ -125,6 +153,8 @@ async def _build_default_state(self) -> State:
processor=await self._build_processor(),
scorer=await self._build_scorer(params),
scorers_params=params.scorers_params,
score_modifier=await self._build_score_modifier(params),
score_modifiers_params=params.score_modifiers_params,
scraper=await self._build_scraper(params),
scrapers_params=params.scrapers_params,
client=await self._build_client(params),
Expand All @@ -140,6 +170,11 @@ async def _save_scorer(state: State, directory: Path) -> None:
if isinstance(state.scorer, Savable):
await state.scorer.save(directory / "scorer")

@staticmethod
async def _save_score_modifier(state: State, directory: Path) -> None:
if isinstance(state.score_modifier, Savable):
await state.score_modifier.save(directory / "score_modifier")

@staticmethod
async def _save_scraper(state: State, directory: Path) -> None:
if isinstance(state.scraper, Savable):
Expand All @@ -150,9 +185,13 @@ async def _create_state_dict(state: State) -> Dict[str, Any]:
return {
"processor_type": state.processor.category,
"scorer_type": state.scorer.category,
"score_modifier_type": state.score_modifier.category
if state.score_modifier is not None
else None,
"scraper_type": state.scraper.category,
"scorers_params": state.scorers_params,
"scrapers_params": state.scrapers_params,
"score_modifiers_params": state.score_modifiers_params,
}

@staticmethod
Expand All @@ -166,6 +205,7 @@ async def _save_state_dict(
async def _save_state(cls, state: State, directory: Path) -> None:
await cls._save_processor(state, directory)
await cls._save_scorer(state, directory)
await cls._save_score_modifier(state, directory)
await cls._save_scraper(state, directory)
state_dict = await cls._create_state_dict(state)
await cls._save_state_dict(state_dict, directory)
Expand Down Expand Up @@ -197,6 +237,19 @@ async def _load_scorer(
default=partial(cls._build_scorer, params),
)

@classmethod
async def _load_score_modifier(
cls, directory: Path, state_dict: Dict[str, Any], params: Params
) -> Optional[ScoreModifier]:
if state_dict.get("score_modifier_type") is None:
return None
return await cls._load_generic(
directory / "score_modifier",
ScoreModifier,
category=state_dict["score_modifier_type"],
default=partial(cls._build_score_modifier, params),
)

@classmethod
async def _load_scraper(
cls, directory: Path, state_dict: Dict[str, Any], params: Params
Expand All @@ -216,6 +269,10 @@ async def _load_saved_state(self, directory: Path) -> State:
processor=await self._load_processor(directory, state_dict),
scorer=await self._load_scorer(directory, state_dict, params),
scorers_params=state_dict["scorers_params"],
score_modifier=await self._load_score_modifier(
directory, state_dict, params
),
score_modifiers_params=state_dict["score_modifiers_params"],
scraper=await self._load_scraper(directory, state_dict, params),
scrapers_params=state_dict["scrapers_params"],
client=await self._build_client(params),
Expand All @@ -237,14 +294,21 @@ async def score(self, post_id: UUID) -> float:
logger.info(f"Scoring post {str(post_id)}...")

async with self.state.read_lock() as state:
fields = state.scorer.needed_fields
if state.score_modifier is not None:
fields = fields + state.score_modifier.needed_fields
response = await state.client.v2.get_tweet(
post_id.int,
user_auth=True,
**state.scorer.needed_fields.to_kwargs(),
**fields.to_kwargs(),
)
tweet = response.data
includes = TweetIncludes.from_response(response)
score = await state.scorer.score(state.client, tweet, includes)
if state.score_modifier is not None:
score = await state.score_modifier.modify(
tweet, includes, score
)

logger.info(f"Score for post {str(post_id)}: {score}.")
return score
Expand All @@ -255,10 +319,13 @@ async def _fetch(
tweets: AsyncIterable[Tuple[Tweet, TweetIncludes]],
processor: Processor,
scorer: Scorer,
score_modifier: Optional[ScoreModifier],
) -> AsyncIterable[Tuple[UUID, Dict[str, Any], float]]:
async for tweet, includes in tweets:
post_id = UUID(int=tweet.id)
score = await scorer.score(client, tweet, includes)
if score_modifier is not None:
score = await score_modifier.modify(tweet, includes, score)

try:
post = await processor.convert(client, tweet, includes)
Expand All @@ -277,10 +344,16 @@ async def scrap(
fields = TweetFields(tweet_fields=["id"])
fields = fields + state.processor.needed_fields
fields = fields + state.scorer.needed_fields
if state.score_modifier is not None:
fields = fields + state.score_modifier.needed_fields
tweets = state.scraper.scrap(state.client, fields, before, after)

posts = self._fetch(
state.client, tweets, state.processor, state.scorer
state.client,
tweets,
state.processor,
state.scorer,
state.score_modifier,
)
if limit is not None:
posts = stream.take(posts, limit)
Expand Down
Empty file.
139 changes: 139 additions & 0 deletions kilroy_face_twitter/src/kilroy_face_twitter/scoring/modifiers.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
import json
from abc import ABC, abstractmethod
from dataclasses import dataclass
from pathlib import Path
from typing import Dict, Any

import numpy as np
from detoxify import Detoxify
from kilroy_face_py_shared import SerializableModel
from kilroy_face_server_py_sdk import Categorizable, classproperty, normalize
from kilroy_server_py_utils import Configurable, Parameter, background
from tweepy import Tweet

from kilroy_face_twitter.models import TweetFields, TweetIncludes
from kilroy_face_twitter.toxicity import load_model


class ScoreModifier(Categorizable, ABC):
@classproperty
def category(cls) -> str:
name: str = cls.__name__
return normalize(name.removesuffix("ScoreModifier"))

@classproperty
@abstractmethod
def needed_fields(cls) -> TweetFields:
pass

@abstractmethod
async def modify(
self, tweet: Tweet, includes: TweetIncludes, score: float
) -> float:
pass


# Toxicity


class ToxicityScoreModifierParams(SerializableModel):
threshold: float = 0.8
alpha: float = 0.9


@dataclass
class ToxicityScoreModifierState:
detoxify: Detoxify
threshold: float
alpha: float


class ToxicityScoreModifier(
ScoreModifier, Configurable[ToxicityScoreModifierState]
):
class ThresholdParameter(Parameter[ToxicityScoreModifierState, float]):
@classproperty
def schema(cls) -> Dict[str, Any]:
return {
"type": "number",
"minimum": 0,
"maximum": 1,
"title": cls.pretty_name,
"default": 0.8,
}

class AlphaParameter(Parameter[ToxicityScoreModifierState, float]):
@classproperty
def schema(cls) -> Dict[str, Any]:
return {
"type": "number",
"minimum": 0,
"maximum": 1,
"title": cls.pretty_name,
"default": 0.9,
}

async def _build_default_state(self) -> ToxicityScoreModifierState:
params = ToxicityScoreModifierParams(**self._kwargs)
return ToxicityScoreModifierState(
detoxify=await background(load_model),
threshold=params.threshold,
alpha=params.alpha,
)

@classmethod
async def _save_state(
cls, state: ToxicityScoreModifierState, directory: Path
) -> None:
state_dict = {
"threshold": state.threshold,
"alpha": state.alpha,
}
with open(directory / "state.json", "w") as f:
json.dump(state_dict, f)

async def _load_saved_state(
self, directory: Path
) -> ToxicityScoreModifierState:
params = ToxicityScoreModifierParams(**self._kwargs)
with open(directory / "state.json", "r") as f:
state_dict = json.load(f)
return ToxicityScoreModifierState(
detoxify=await background(load_model),
threshold=state_dict.get("threshold", params.threshold),
alpha=state_dict.get("alpha", params.alpha),
)

@classproperty
def needed_fields(cls) -> TweetFields:
return TweetFields(tweet_fields=["text"])

async def modify(
self, tweet: Tweet, includes: TweetIncludes, score: float
) -> float:
async with self.state.read_lock() as state:
toxicity = state.detoxify.predict(tweet.text)["toxicity"]
threshold = state.threshold
alpha = state.alpha
return self.modifier(toxicity, threshold, alpha) * score

@staticmethod
def modifier(x: float, threshold: float, alpha: float) -> float:
x = np.clip(x, 0, 1).item()
threshold = np.clip(threshold, 0, 1).item()
alpha = np.clip(alpha, 0, 1).item()

if x == 0 or x == 1:
return 1 - x
if threshold == 0:
return 0
if threshold == 1:
return 1
if alpha == 1 and x == threshold:
return 1

inner_exponent = (-np.log(2) / np.log(threshold)).item()
outer_exponent = 1 / (1 - alpha)
inner_value = x**inner_exponent
denominator = 1 + (inner_value / (1 - inner_value)) ** outer_exponent
return 1 / denominator

0 comments on commit 0d18ff7

Please sign in to comment.