",
+ "waitTime": 300,
+ "reports": {
+ "rules": [
+ {
+ "name": "No spam links",
+ "type": "link_resolver",
+ "blocked": ["evilspam\\.website", "dontmarry\\.com"],
+ "severity": 2,
+ "punishment": {
+ "type": "suspend",
+ "message": "Your account has been suspended for spamming."
+ }
+ },
+ {
+ "name": "No link-in-bio spammers",
+ "type": "bio_content",
+ "blocked": ["sexie.ru"],
+ "severity": 1,
+ "punishment": {
+ "type": "disable",
+ "message": "Your account has been disabled for spamming."
+ }
+ }
+ ]
+ },
+ "pendingAccounts": {
+ "rules": [
+ {
+ "name": "No tags",
+ "_comment": "Because honestly, you're definitely a bot if you're putting tags into the field",
+ "type": "message_content",
+ "blocked": [".*"],
+ "severity": 1,
+ "punishment": {
+ "type": "reject"
+ }
+ },
+ {
+ "name": "StopForumSpam test",
+ "type": "stopforumspam",
+ "threshold": 95,
+ "severity": 1,
+ "punishment": {
+ "type": "reject"
+ }
+ }
+ ]
+ }
+}
```
-## Caveats
+A more [in-depth guide to Ivory configuration](https://github.com/bclindner/ivory/wiki/Getting-Started)
+and the list of [rules](https://github.com/bclindner/ivory/wiki/Rules) and
+[punishments](https://github.com/bclindner/ivory/wiki/Punishments)
+can be found on the wiki.
+
+Ideally you only have to change this once in a blue moon, but if you do, you can
+use the `"dryRun": true` option to prevent Ivory from taking action, so you can
+test some rules on recent live moderation queues.
-This code is using Selenium to drive its report handling system. Selenium can be
-finicky. Stuff can break.
+### Running
-Take care when writing your rules. Ivory doesn't care if you get them wrong, and
-Ivory will absolutely ban users with impunity if you do. Test them if you can.
-Support for dry runs will come available when I get to it.
+After you've set up a config file, run the following in a Linux terminal:
-## Maintainers
+```
+# if you're running in the same terminal session you installed from, you can
+# skip this next line:
+source bin/activate
+python .
+```
-This is currently solely maintained by me,
-[@bclindner@mastodon.technology](http://mastodon.technology/@bclindner).
+Hopefully, no errors will be thrown and Ivory will start up and begin its first
+moderation pass, reading the first page of active reports and pending users and
+applying your set rules. Ivory will handle these queues every 300 seconds,
+or 5 minutes. (This is controlled by the `waitTime` part of the above config
+file - if you wanted 10 minutes, you could set it to 600!)
+If you'd rather run it on some other schedule via a proper task scheduler like
+cron or a systemd .timer unit, you can use `python . oneshot` which will run
+Ivory only once. This sample cron line will run Ivory every 5 minutes and output
+to a log file:
+
+```cron
+*/5 * * * * cd /absolute/path/to/ivory; ./bin/python . oneshot >> ivory.log
+```
+
+## Extending (custom rules)
+
+You'll notice the `rules/` folder is a flat folder of Python scripts, one per
+Ivory rule. If you've got a little Python experience, you can easily create your
+own rules by just dropping in a new Python file and using one of the other files
+in the folder as a jumping-off point.
+
+The reports and pending accounts that Ivory rules receive are the same as what
+Mastodon.py provides for
+[reports](https://mastodonpy.readthedocs.io/en/stable/#report-dicts) and [admin
+accounts](https://mastodonpy.readthedocs.io/en/stable/#admin-account-dicts),
+respectively.
+
+**Don't forget to use `dryRun` in your config when testing your new rule!**
+
+Once you've finished writing up your custom rule, say as
+`rules/filename_of_your_rule.py`, you can address it by its filename in your
+config:
+
+```json
+...
+"pendingAccounts": {
+ "rules": [
+ {
+ "name": "An instance of my cool new rule",
+ "type": "filename_of_your_rule",
+ "custom_option": true,
+ "severity": 5,
+ "punishment": {
+ "type": "reject"
+ }
+ },
+ ]
+}
+...
+```
+
+If you come up with any useful rules and wouldn't mind writing a schema and some
+tests for it, making a pull request to include it in Ivory's main release would
+be highly appreciated! The more rules Ivory gets, the more tools are
+collectively available to other admins for dealing with spammers and other
+threats to the Fediverse at large.
## Bugs & Contributing
-Contributions welcome. I'm a JS dev, not a Python one, so I may well need
-them.
+If you have any issues with Ivory not working as expected, please file a GitHub
+issue.
-Got bugs or feature request? File them as a GitHub issue and I'll address them
-when I can. Same goes for PRs.
+Contributions are also welcome - send in pull requests if you have anything new
+to add.
+
+If you have any other questions, go ahead and [ping me on
+Mastodon](https://mastodon.technology/@bclindner) and I might be able to answer
+them.
diff --git a/__main__.py b/__main__.py
old mode 100644
new mode 100755
index ee58eaa..3bbd4af
--- a/__main__.py
+++ b/__main__.py
@@ -1,4 +1,46 @@
+#!/usr/bin/env python3
+"""
+Main "executable" for Ivory.
+
+Running this file will start Ivory in watch mode, using the config provided in
+config.json.
+"""
+
+import logging
+import sys
+import json
+import argparse
from ivory import Ivory
+from constants import DEFAULT_CONFIG_PATH, COMMAND_WATCH, COMMAND_ONESHOT
-if __name__ == '__main__':
- Ivory().run()
+if __name__ == "__main__":
+ logger = logging.getLogger(__name__)
+ argparser = argparse.ArgumentParser(
+ description="A Mastodon automoderator.")
+ argparser.add_argument("--config",
+ dest="configpath",
+ help="Path to the configuration file (default is config.json)",
+ default=DEFAULT_CONFIG_PATH)
+ argparser.add_argument('command',
+ help="Command to run (oneshot to run once, watch to run on a loop). Runs in watch mode by default.",
+ default=COMMAND_WATCH,
+ nargs='?',
+ choices=[COMMAND_WATCH, COMMAND_ONESHOT])
+ args = argparser.parse_args()
+ try:
+ # set up logging
+ logging.basicConfig(stream=sys.stdout)
+ with open(args.configpath) as config_file:
+ config = json.load(config_file)
+ logging.getLogger().setLevel(config.get('logLevel', logging.INFO))
+ # start up ivory in watch mode
+ if args.command == COMMAND_WATCH:
+ Ivory(config).watch()
+ elif args.command == COMMAND_ONESHOT:
+ Ivory(config).run()
+ except OSError as err:
+ logger.exception("failed to load config file")
+ exit(1)
+ except KeyboardInterrupt as err:
+ logger.info("interrupt signal detected, exiting")
+ exit(1)
diff --git a/constants.py b/constants.py
index 5422a23..92bf829 100644
--- a/constants.py
+++ b/constants.py
@@ -1 +1,23 @@
-VERSION = "0.2"
+"""
+Constants for Ivory.
+"""
+
+# Ivory version number
+VERSION = "v1.0.0"
+
+# Default amount of seconds to wait between report passes
+DEFAULT_WAIT_TIME = 300
+
+# Default configuration path to use when no path is manually specified
+DEFAULT_CONFIG_PATH = "config.json"
+
+# Punishment types
+PUNISH_WARN = "warn"
+PUNISH_REJECT = "reject"
+PUNISH_DISABLE = "disable"
+PUNISH_SILENCE = "silence"
+PUNISH_SUSPEND = "suspend"
+
+# Command types
+COMMAND_WATCH = "watch"
+COMMAND_ONESHOT = "oneshot"
diff --git a/core.py b/core.py
deleted file mode 100644
index f21db0a..0000000
--- a/core.py
+++ /dev/null
@@ -1,192 +0,0 @@
-"""
-Base classes for the Ivory system.
-"""
-from typing import List
-
-
-class User:
- """
- A simplified class representation of a Mastodon user.
- """
-
- def __init__(self, user_id: str, username: str):
- self.id = user_id
- self.username = username
-
- def __repr__(self):
- return "User %s (@%s)" % (self.id, self.username)
-
- def __str__(self):
- return self.username
-
-
-class Report:
- """
- A class representation of a Mastodon report.
-
- Since reports from other instances won't have accounts attached, drivers
- cannot create reporter users, so they should just set the reporter field to None.
- """
-
- def __init__(self,
- report_id: str,
- status: str,
- reported: User,
- reporter: User,
- report_comment: str,
- reported_posts: List[str],
- reported_links: List[str]):
- # ugh
- self.report_id = report_id
- self.status = status
- self.reporter = reporter
- self.reported = reported
- self.comment = report_comment
- self.posts = reported_posts
- self.links = reported_links
-
- def __str__(self):
- return "Report #%s (%s)" % (self.report_id, self.reported.username)
-
- def __repr__(self):
- return self.__str__() # lol
-
-
-class Punishment:
- """
- A Punishment is Ivory's representation of a moderation action.
- Punishments are of a specific type (usually suspend, silence, or warn), and
- have a given severity.
- Punishments also optionally come with a configuration dict that defines
- extra options.
- NOTE: Should this be a base class where we derive punishments?
- """
-
- def __init__(self, config):
- self.type = config['type']
- self.severity = config['severity']
- self.config = config
-
- def __str__(self):
- return "Punishment (%s)" % self.type
-
- def __repr__(self):
- return "Punishment (%s, severity %s)" % (self.type, self.severity)
-
-
-class Rule:
- """
- Rules in Ivory are kind of like unit tests for reports.
- Each one can be run against a report to determine if it passes or fails.
- In this case, it differs in that a Rule comes with a Punishment.
- """
-
- def __init__(self, config):
- self.name = config['name']
- self.punishment = Punishment(config['punishment'])
-
- def test(self, report: Report):
- """
- Test the rule against a given report.
- """
-
- def __str__(self):
- return self.name
-
- def __repr__(self):
- return "Rule '%s' (%s)" % (self.name, type(self).__name__)
-
-
-class Judge:
- """
- Interface for judging reports based on rules.
-
- The Judge class is a dead-simple class that holds Rule objects, and allows
- you to test each Rule on a single Report object with the make_judgement
- method.
- """
- rules = []
-
- def add_rule(self, rule):
- """
- Add a rule to the judge's list.
-
- Future judgements will use this rule.
- """
- self.rules.append(rule)
-
- def clear_rules(self):
- """
- Clear this judge's rules list.
- """
- self.rules = []
-
- def make_judgement(self, report: Report) -> (Punishment, List[Rule]):
- """
- Judge a report.
-
- Returns:
- final_verdict: Returns the Punishment object that should be used for
- this report, or None if there is none.
- rules_broken: The list of rules the judge determined were broken.
- """
- most_severe_rule = None
- rules_broken = set()
- for rule in self.rules:
- if rule.test(report):
- rules_broken.add(rule)
- if (most_severe_rule is None or
- most_severe_rule.punishment.severity < rule.punishment.severity):
- most_severe_rule = rule
- if most_severe_rule is not None:
- final_verdict = most_severe_rule.punishment
- else:
- final_verdict = None
- return (final_verdict, rules_broken)
-
-
-class Driver:
- """
- A dummy Ivory driver to derive your own custom drivers from.
-
- For Ivory to work, all of the below base methods must be defined.
- """
-
- def __init__(self):
- pass
-
- def get_unresolved_report_ids(self):
- """
- Get a list of unresolved report IDs.
-
- Returns:
- list: An array of report ID strings.
- """
- raise NotImplementedError()
-
- def get_report(self, report_id: str):
- """
- Get a report by ID.
-
- The report ID is currently a string type as it will almost certainly be
- plugged into a URL or a REST API. This may change in future major versions.
- """
- raise NotImplementedError()
-
- def punish(self, report: Report, punishment: Punishment):
- """
- Apply a punishment to a report.
-
- Driver classes should implement Punishments of type "suspend",
- "silence", and "warn", if possible. If it isn't possible to implement
- them, raise a NotImplementedError. You can define custom punishments,
- but be wary of extending too much.
- """
- raise NotImplementedError()
-
- def add_note(self, report_id: str, message: str, resolve: bool = False):
- """
- Add a note to a report by its ID.
- """
- raise NotImplementedError()
-
diff --git a/drivers/__init__.py b/drivers/__init__.py
deleted file mode 100644
index e69de29..0000000
diff --git a/drivers/browser.py b/drivers/browser.py
deleted file mode 100644
index fcd4740..0000000
--- a/drivers/browser.py
+++ /dev/null
@@ -1,226 +0,0 @@
-import pickle
-# for (safely) parsing URLs
-from urllib.parse import urljoin
-# for getting passwords during initialization
-from getpass import getpass
-# The WebDriver itself
-from selenium import webdriver
-# WebDriverWait stuff
-from selenium.webdriver.common.by import By
-from selenium.webdriver.support.ui import WebDriverWait
-from selenium.webdriver.support import expected_conditions as EC
-# Selenium exceptions which we use to handle some potential fault areas
-from selenium.common.exceptions import NoSuchElementException, TimeoutException
-# Ivory imports
-from core import Report, Driver, User
-from exceptions import ConfigurationError, DriverError, DriverNetworkError
-
-# Default timeout for the Selenium driver to get a Web page.
-DEFAULT_TIMEOUT = 30
-
-class BrowserDriver(Driver):
- """
- A Selenium-based browser driver for Ivory.
- """
-
- def __init__(self, config):
- Driver.__init__(self)
- try:
- self.instance_url = config['instance_url']
- except KeyError:
- raise ConfigurationError("instance_url not found")
- self.__driver = webdriver.Firefox()
- self.__wait = WebDriverWait(self.__driver, DEFAULT_TIMEOUT)
- # Attempt log-in
- cookies = []
- try:
- with open('cookies.pickle', 'rb') as cookies_file:
- cookies = pickle.load(cookies_file)
- print('Found cookies; using those instead of asking you for login')
- except Exception:
- print("Failed to open cookies file; manual login required")
- if cookies:
- self.__login_with_cookies(cookies)
- print('Looks like login was successful.')
- else:
- email = input('Enter email: ')
- password = getpass(prompt='Enter password: ')
- otp = input('If using OTP, enter OTP token: ')
- cookies = self.__login_with_credentials(email, password, otp)
- print('Looks like login was successful. Saving cookies...')
- with open('cookies.pickle', 'wb') as cookies_file:
- pickle.dump(cookies, cookies_file)
-
- def __url(self, path=''):
- return urljoin(self.instance_url, path)
-
- def __login_with_cookies(self, cookies):
- """
- Login to the given instance using stored cookies.
-
- Current implementation is pretty rough - needs more checking to see if the
- cookies are actually working or not.
- """
- self.__driver.get(self.__url())
- for cookie in cookies:
- self.__driver.add_cookie(cookie)
- return True
-
- def __login_with_credentials(self, email, password, otp=""):
- """
- Login to the given instance using user credentials.
-
- Returns:
- list: A list of cookies in Selenium-consumable form.
- Save this and use it with login_with_cookies for future logins.
- """
- try:
- self.__driver.get(self.__url('/auth/sign_in'))
- # TODO add wait here for safety
- # Email + Password
- self.__driver.find_element_by_id("user_email").send_keys(email)
- pwfield = self.__driver.find_element_by_id("user_password")
- pwfield.send_keys(password)
- pwfield.submit()
- except TimeoutException:
- raise DriverError("failed logging in - OTP page could not be reached")
- except NoSuchElementException:
- raise DriverError("failed to input login details - has this instance's login page been modified?")
-
- # OTP
- # TODO NON-OTP IS UNTESTED
- if otp:
- # Server needs a sec to catch up
- try:
- self.__wait.until(EC.presence_of_element_located((By.ID, 'user_otp_attempt')))
- otpfield = self.__driver.find_element_by_id('user_otp_attempt')
- otpfield.send_keys(otp)
- otpfield.submit()
- except TimeoutException:
- raise DriverError("failed logging in - OTP page could not be reached")
- except NoSuchElementException:
- raise DriverError("failed to input and submit OTP code")
- # Server needs a sec to catch up
- try:
- self.__wait.until(EC.url_contains('getting-started'))
- except TimeoutException:
- raise DriverError("failed logging in - homepage could not be reached")
- # Grab cookies
- cookies = self.__driver.get_cookies()
- return cookies
-
- def get_unresolved_report_ids(self):
- """
- Scrapes the page to get unresolved reports.
-
- This does not get all reports! Currently it's just designed to get what
- it can. If you end up with more than a page of reports at a time, this
- driver might not work for you for now.
- """
- try:
- self.__driver.get(self.__url('/admin/reports'))
- self.__wait.until(EC.title_contains('Reports'))
- link_elements = self.__driver.find_elements_by_xpath(
- '//div[@class="report-card__summary__item__content"]/a')
- links = [link.get_attribute('href').split('/')[-1] for link in link_elements]
- return links
- except TimeoutException:
- raise DriverNetworkError("timed out navigating to reports page")
-
- def get_report(self, report_id: str):
- """
- Scrape the report page into a Report object.
- """
- # Parse the report from the page itself
- self.__driver.get(self.__url('/admin/reports/') + report_id)
- try:
- self.__wait.until(EC.title_contains('Report #'+report_id))
- except TimeoutException:
- raise DriverNetworkError("timed out getting report #"+report_id)
- # Get report status
- report_status = self.__driver.find_element_by_xpath(
- '//table[1]//tr[5]/td[1]').text
- # Get reported user
- reported_row = self.__driver.find_element_by_xpath(
- '//table[1]//tr[1]/td[1]/a')
- reported_username = reported_row.get_attribute('title')
- reported_id = reported_row.get_attribute('href').split('/')[-1]
- reported_user = User(reported_id, reported_username)
- # Get reporter data
- try:
- reporter_row = self.__driver.find_element_by_xpath(
- '//table[1]//tr[2]/td[1]/a')
- reporter_username = reporter_row.get_attribute('title')
- reporter_id = reporter_row.get_attribute('href').split('/')[-1]
- reporter_user = User(reporter_id, reporter_username)
- except NoSuchElementException:
- # If that block failed, then this was probably a federated report
- # forwarded to us - set reporter_user to None
- reporter_user = None
- # Get reporter's comment
- reporter_comment = self.__driver.find_element_by_class_name(
- 'speech-bubble__bubble').text
- # Get reported posts
- # Un-CW all posts for deserialization
- cwposts = self.__driver.find_elements_by_tag_name('details')
- for post in cwposts:
- post.click()
- posts = self.__driver.find_elements_by_class_name('status__content')
- reported_posts = [post.text for post in posts]
- # Get links in reported posts
- links = self.__driver.find_elements_by_xpath(
- '//div[@class="status__content"]//a')
- reported_links = [link.get_attribute(
- 'href') for link in links] # lmfao
- # Turn it all into a Report object and send it back
- report = Report(report_id, report_status, reported_user,
- reporter_user, reporter_comment, reported_posts, reported_links)
- return report
-
- def add_note(self, report_id: str, message: str, resolve: bool = False):
- """
- Adds a note through the reports page directly.
- """
- # Parse the report from the page itself
- self.__driver.get(self.__url('/admin/reports/') + report_id)
- self.__wait.until(EC.title_contains('Report #'+report_id))
- self.__driver.find_element_by_id(
- 'report_note_content').send_keys(message)
- buttons = self.__driver.find_elements_by_class_name('btn')
- if resolve:
- buttons[0].click()
- else:
- buttons[1].click()
- self.__wait.until(EC.presence_of_element_located(
- (By.CLASS_NAME, 'notice')))
-
- def punish(self, report, punishment):
- """
- Punish a user.
- """
- if punishment.type == 'suspend':
- self.suspend(report.report_id)
- else:
- raise NotImplementedError()
-
- def suspend(self, report_id):
- """
- Suspends a user through the reports page directly.
- """
- try:
- self.__driver.get(self.__url('/admin/reports/') + report_id)
- self.__wait.until(EC.title_contains('Report #'+report_id))
- except TimeoutException:
- raise DriverNetworkError("failed to get report #"+report_id)
- try:
- self.__driver.find_element_by_xpath(
- '//div[@class="content"]/div[1]//a[text() = "Suspend"]').click()
- self.__wait.until(EC.title_contains('Perform moderation'))
- self.__driver.find_element_by_class_name('btn').click()
- self.__wait.until(EC.title_contains('Reports'))
- except TimeoutException:
- raise DriverNetworkError("timed out attempting to suspend user")
- except NoSuchElementException:
- raise DriverError("malformed page")
-
-driver = BrowserDriver
diff --git a/exceptions.py b/exceptions.py
deleted file mode 100644
index 02be62d..0000000
--- a/exceptions.py
+++ /dev/null
@@ -1,34 +0,0 @@
-class ConfigurationError(Exception):
- """
- Generic exception for config problems.
-
- No derivative of this should be necessary - throw a ConfigurationError in
- any case and Ivory should exit after printing the message provided in the
- error.
- In the future I may extend this by having it provide the name of the key
- it failed on.
- """
-
-class DriverError(Exception):
- """
- Generic base exception for Driver problems.
- """
-
-class DriverNetworkError(DriverError):
- """
- Exception for network errors a Driver might encounter.
-
- Ivory currently assumes these network errors are temporary, and will
- attempt to retry when this error is raised.
- """
-
-class DriverAuthorizationError(DriverError):
- """
- Exception raised if the driver does not have access to something it needs
- using the provided credentials.
- """
-
-class RuleError(Exception):
- """
- Generic base exception for Rule problems.
- """
\ No newline at end of file
diff --git a/ivory.py b/ivory.py
index b3ea730..fc6da74 100644
--- a/ivory.py
+++ b/ivory.py
@@ -1,148 +1,186 @@
-from importlib import import_module
-import traceback
-import time
-from typing import List
+"""
+Ivory core class file.
+Contains everything you need to run Ivory programmatically.
+"""
+import logging
+import time # for Ivory.watch()
-import yaml
+from mastodon import Mastodon, MastodonError, MastodonGatewayTimeoutError # API wrapper lib
-from core import Judge
-from exceptions import ConfigurationError, DriverError, DriverAuthorizationError, DriverNetworkError
+import constants # Ivory constants
+from judge import ReportJudge, PendingAccountJudge, Punishment # Judge to integrate into Ivory
+from schemas import IvoryConfig
-MAX_RETRIES = 3
-class Ivory:
+class Ivory():
"""
- The core class for the Ivory automoderation system.
- In practice, you really only need to import this and a driver to get it
- running.
+ The main Ivory class, which programmatically handles reports pulled from
+ the Mastodon API.
"""
- handled_reports: List[str] = []
- def __init__(self):
- # get config file
- try:
- with open('config.yml') as config_file:
- config = yaml.load(config_file, Loader=yaml.Loader)
- except OSError:
- print("Failed to open config.yml!")
- exit(1)
- self.debug_mode = config.get('debug_mode', False)
- self.wait_time = config.get('wait_time', 300)
- # parse rules first; fail early and all that
- self.judge = Judge()
- try:
- rules_config = config['rules']
- except KeyError:
- print("ERROR: Couldn't find any rules in config.yml!")
- exit(1)
- rulecount = 1
- for rule_config in rules_config:
- try:
- # programmatically load rule based on type in config
- rule_type = rule_config['type']
- Rule = import_module('rules.' + rule_type).rule
- self.judge.add_rule(Rule(rule_config))
- rulecount += 1
- except ModuleNotFoundError:
- print("ERROR: Rule #%d not found!" % rulecount)
- exit(1)
- except Exception as err:
- print("Failed to initialize rule #%d!" % rulecount)
- raise err
- try:
- driver_config = config['driver']
- # programmatically load driver based on type in config
- module_name = 'drivers.' + driver_config['type']
- Driver = import_module(module_name).driver
- self.driver = Driver(driver_config)
- except KeyError:
- print("ERROR: Driver configuration not found in config.yml!")
- exit(1)
- except ModuleNotFoundError as err:
- if err.name == module_name:
- print("ERROR: Driver not found!")
- exit(1)
- else:
- raise err
- except ConfigurationError as err:
- print("ERROR: Bad configuration:", err)
- exit(1)
- except DriverError as err:
- print("ERROR while initializing driver:", err)
+ def __init__(self, raw_config):
+ """
+ Runs Ivory.
+ """
+ # **Validate the configuration**
+ config = IvoryConfig(raw_config)
+
+ # **Set up logger**
+ self._logger = logging.getLogger(__name__)
+
+ self._logger.info("Ivory version %s starting", constants.VERSION)
+
+ # **Load Judge and Rules**
+ self._logger.info("parsing rules")
+ if 'reports' in config:
+ self.report_judge = ReportJudge(config['reports'].get("rules"))
+ else:
+ self._logger.debug("no report rules detected")
+ self.report_judge = None
+ if 'pendingAccounts' in config:
+ self.pending_account_judge = PendingAccountJudge(config['pendingAccounts'].get("rules"))
+ else:
+ self._logger.debug("no pending account rules detected")
+ self.pending_account_judge = None
+
+
+ # **Initialize and verify API connectivity**
+ self._api = Mastodon(
+ access_token=config['token'],
+ api_base_url=config['instanceURL']
+ )
+ self._logger.debug("mastodon API wrapper initialized")
+ # 2.9.1 required for moderation API
+ if not self._api.verify_minimum_version("2.9.1"):
+ self._logger.error("This instance is not updated to 2.9.1 - this version is required for the Moderation API %s", self._api.users_moderated)
exit(1)
- except Exception as err:
- print("ERROR: Uncaught error while initializing driver!!")
- raise err
+ self._logger.debug("minimum version verified; should be ok")
+ # grab some info which could be helpful here
+ self.instance = self._api.instance()
+ self.user = self._api.account_verify_credentials()
+ # log a bunch of shit
+ self._logger.info("logged into %s as %s",
+ self.instance['uri'], self.user['username'])
+ self._logger.debug("instance info: %s", self.instance)
+ self._logger.debug("user info: %s", self.user)
+
+ # **Set some variables from config**
+ if 'waitTime' not in config:
+ self._logger.info(
+ "no waittime specified, defaulting to %d seconds", constants.DEFAULT_WAIT_TIME)
+ self.wait_time = config.get("waitTime", constants.DEFAULT_WAIT_TIME)
+ self.dry_run = config.get('dryRun', False)
- def handle_reports(self):
+
+ def handle_unresolved_reports(self):
+ """
+ Handles all unresolved reports.
+ """
+ reports = self._api.admin_reports()
+ for report in reports:
+ self.handle_report(report)
+
+ def handle_report(self, report: dict):
+ """
+ Handles a single report.
"""
- Get reports from the driver, and judge and punish each one accordingly.
+ self._logger.info("handling report #%d", report['id'])
+ (punishment, rules_broken) = self.report_judge.make_judgement(report)
+ if rules_broken:
+ self._logger.info("report breaks these rules: %s", rules_broken)
+ if punishment is not None:
+ self._logger.info("handling report with punishment %s", punishment)
+ self.punish(report['target_account']['id'], punishment, report['id'])
+
+ def handle_pending_accounts(self):
+ """
+ Handle all accounts in the pending account queue.
+ """
+ accounts = self._api.admin_accounts(status="pending")
+ for account in accounts:
+ self.handle_pending_account(account)
+
+ def handle_pending_account(self, account: dict):
"""
- retries = 0
+ Handle a single pending account.
+ """
+ self._logger.info("handling pending user %s", account['username'])
+ (punishment, rules_broken) = self.pending_account_judge.make_judgement(account)
+ if rules_broken:
+ self._logger.info("pending account breaks these rules: %s", rules_broken)
+ if punishment is not None:
+ self._logger.info("handling report with punishment %s", punishment)
+ self._logger.debug("punishment cfg: %s", punishment.config)
+ self.punish(account['id'], punishment)
+
+ def punish(self, account_id, punishment: Punishment, report_id=None):
+ if self.dry_run:
+ self._logger.info("ignoring punishment; in dry mode")
+ return
+ maxtries = 3
+ tries = 0
while True:
try:
- reports_to_check = self.driver.get_unresolved_report_ids()
- break
- except DriverNetworkError as err:
- retries += 1
- if retries < MAX_RETRIES:
- print("Failed to check reports - retrying...")
- continue
+ if punishment.type == constants.PUNISH_REJECT:
+ self._api.admin_account_reject(account_id)
+ elif punishment.type == constants.PUNISH_WARN:
+ self._api.admin_account_moderate(
+ account_id,
+ None,
+ report_id,
+ text=punishment.config.get('message')
+ )
+ elif punishment.type == constants.PUNISH_DISABLE:
+ self._api.admin_account_moderate(
+ account_id,
+ "disable",
+ report_id,
+ text=punishment.config.get('message')
+ )
+ elif punishment.type == constants.PUNISH_SILENCE:
+ self._api.admin_account_moderate(
+ account_id,
+ "silence",
+ report_id,
+ text=punishment.config.get('message')
+ )
+ elif punishment.type == constants.PUNISH_SUSPEND:
+ self._api.admin_account_moderate(
+ account_id,
+ "suspend",
+ report_id,
+ text=punishment.config.get('message')
+ )
else:
- print("Failed to get reports:",err)
- break
- for report_id in reports_to_check:
- if report_id in self.handled_reports:
- print("Report #%s skipped" % report_id)
- continue
- retries = 0
- while True:
- try:
- print("Handling report #%s..." % report_id)
- report = self.driver.get_report(report_id)
- (final_verdict, rules_broken) = self.judge.make_judgement(report)
- if final_verdict:
- self.driver.punish(report, final_verdict)
- rules_broken_str = ', '.join(
- [str(rule) for rule in rules_broken]) # lol
- note = "Ivory has suspended this user for breaking rules: "+rules_broken_str
- else:
- note = "Ivory has scanned this report and found no infractions."
- self.driver.add_note(report.report_id, note)
- self.handled_reports.append(report_id)
- # network error handling
- except DriverNetworkError as err:
- retries += 1
- print("Encountered network error:", err)
- if retries < MAX_RETRIES:
- print("Retrying (attempt %d)..." % retries)
- continue
- else:
- print("Max retries hit; skipping...")
- # driver error handling
- except DriverAuthorizationError as err:
- print("Fatal authorization error:",err)
- print("Exiting...")
- exit(1)
- except DriverError as err:
- print("Driver error handling report #"+report_id+":",err)
- print("Skipping...")
- # general exception catch
- except Exception as err:
- print("Error handling report #"+report_id+":",err)
- print("Skipping...")
+ # whoops
+ raise NotImplementedError()
+ break
+ except MastodonGatewayTimeoutError as err:
+ self._logger.warn("gateway timed out. ignoring for now, if that didn't do it we'll get it next pass...")
break
def run(self):
+ self._logger.info("starting moderation pass")
+ try:
+ if self.report_judge:
+ self.handle_unresolved_reports()
+ if self.pending_account_judge:
+ self.handle_pending_accounts()
+ self._logger.info("moderation pass complete")
+ except MastodonError:
+ self._logger.exception(
+ "enountered an API error. waiting %d seconds to try again", self.wait_time)
+
+ def watch(self):
"""
- Runs the Ivory automoderator main loop.
+ Runs handle_unresolved_reports() on a loop, with a delay specified in
+ the "waittime" field of the config.
"""
while True:
- print('Running report pass...')
- try:
- self.handle_reports()
- except Exception as err:
- print("Unexpected error handling reports:",err)
- if self.debug_mode:
- raise err
- print('Report pass complete.')
- time.sleep(self.wait_time)
+ starttime = time.time()
+ self.run()
+ time_to_wait = self.wait_time - (time.time() - starttime)
+ if time_to_wait > 0:
+ self._logger.debug("waiting for %.4f seconds", time_to_wait)
+ time.sleep(time_to_wait)
+ else:
+ self._logger.warn("moderation pass took longer than waitTime - this will cause significant drift. you may want to increase waitTime")
diff --git a/judge.py b/judge.py
new file mode 100644
index 0000000..6498866
--- /dev/null
+++ b/judge.py
@@ -0,0 +1,151 @@
+"""
+The Judge system, which tests the reports and pending accounts Ivory feeds into
+it.
+"""
+import logging # for logging in Judge
+from typing import List # type hinting for List
+from importlib import import_module # for dynamic rule imports
+
+class Punishment:
+ """
+ A Punishment is Ivory's representation of a moderation action.
+
+ Currently this is a simple proxy to the Mastodon.admin_account_moderate()
+ function's kwargs, with an extra field for the punishment's severity.
+ """
+
+ def __init__(self, severity: int, **config):
+ self.type = config['type']
+ self.severity = severity
+ self.config = config
+
+ def __str__(self):
+ return "Punishment (%s)" % self.type
+
+ def __repr__(self):
+ return "Punishment (%s, severity %s)" % (self.type, self.severity)
+
+
+class Rule:
+ """
+ Rules in Ivory are kind of like unit tests for the data the Judge screens.
+ Each one can be run against a data structure to determine if it passes or
+ fails.
+ In this case, it differs in that a Rule comes with a Punishment.
+ """
+
+ def __init__(self, **config):
+ self.name = config['name']
+ self.punishment = Punishment(config['severity'], **config['punishment'])
+ self._logger = logging.getLogger(__name__)
+
+ def test_report(self, report: dict):
+ """
+ Test a report.
+ """
+ raise NotImplementedError()
+
+ def test_pending_account(self, account: dict):
+ """
+ Test a pending account.
+ """
+ raise NotImplementedError()
+
+ def __str__(self):
+ return self.name
+
+ def __repr__(self):
+ return "Rule '%s' (%s)" % (self.name, type(self).__name__)
+
+
+class Judge:
+ """
+ Interface for judging data based on rules.
+
+ The Judge class is a dead-simple class that holds Rule objects, and allows
+ you to test each Rule on a single dict with the make_judgement
+ method.
+ """
+
+ def __init__(self, rule_configs: List[dict] = None):
+ self.rules = []
+ if rule_configs is not None:
+ self.load_rules(rule_configs)
+
+ def load_rules(self, rule_configs: List[dict]):
+ """
+ Load rules from a list of rule configuration dicts.
+ """
+ rulecount = 1
+ logger = logging.getLogger(__name__)
+ for rule_config in rule_configs:
+ try:
+ # programmatically load rule based on type in config
+ rule_type = rule_config['type']
+ logger.debug(
+ "loading rule #%d of type %s", rulecount, rule_type)
+ Rule = import_module('rules.{}'.format(rule_type)).rule
+ new_rule = Rule(rule_config)
+ self.add_rule(new_rule)
+ rulecount += 1
+ except ModuleNotFoundError as err:
+ logger.exception("rule #%d not found", rulecount)
+ logger.critical("could not parse rules")
+ raise err
+ except Exception as err:
+ logger.exception(
+ "failed to initialize rule #%d", rulecount)
+ logger.critical("could not parse rules")
+ raise err
+ logger.info("%s judge loaded %d rules (%d total)", type(self).__name__, rulecount - 1, len(self.rules))
+
+ def add_rule(self, rule):
+ """
+ Add a rule to the judge's list.
+
+ Future judgements will use this rule.
+ """
+ self.rules.append(rule)
+
+ def clear_rules(self):
+ """
+ Clear this judge's rules list.
+ """
+ self.rules = []
+
+ def test_rule(self, rule: Rule, data: dict):
+ raise NotImplementedError()
+
+
+ def make_judgement(self, data: dict) -> (Punishment, List[Rule]):
+ """
+ Judge some data.
+
+ Returns:
+ final_verdict: Returns the Punishment object that should be used for
+ this data, or None if there is none.
+ rules_broken: The list of rules the judge determined were broken.
+ """
+ logger = logging.getLogger(__name__)
+ most_severe_rule = None
+ rules_broken = set()
+ for rule in self.rules:
+ logger.debug("running rule %s", rule)
+ rule_was_broken = self.test_rule(rule, data)
+ if rule_was_broken:
+ rules_broken.add(rule)
+ if (most_severe_rule is None or
+ most_severe_rule.punishment.severity < rule.punishment.severity):
+ most_severe_rule = rule
+ if most_severe_rule is not None:
+ final_verdict = most_severe_rule.punishment
+ else:
+ final_verdict = None
+ return (final_verdict, rules_broken)
+
+class ReportJudge(Judge):
+ def test_rule(self, rule: Rule, data: dict):
+ return rule.test_report(data)
+class PendingAccountJudge(Judge):
+ def test_rule(self, rule: Rule, data: dict):
+ return rule.test_pending_account(data)
diff --git a/requirements.txt b/requirements.txt
index c11ee28..e155ee6 100644
--- a/requirements.txt
+++ b/requirements.txt
@@ -1,7 +1,27 @@
-certifi==2019.3.9
+atomicwrites==1.3.0
+attrs==19.3.0
+autopep8==1.4.4
+beautifulsoup4==4.8.0
+certifi==2019.9.11
chardet==3.0.4
+decorator==4.4.0
idna==2.8
-PyYAML==5.1.1
+importlib-metadata==0.23
+Mastodon.py==1.4.6
+more-itertools==7.2.0
+packaging==19.2
+pluggy==0.13.0
+py==1.8.0
+pycodestyle==2.5.0
+pyparsing==2.4.2
+pytest==5.2.2
+python-dateutil==2.8.0
+python-magic==0.4.15
+pytz==2019.2
requests==2.22.0
-selenium==3.141.0
-urllib3==1.25.3
+six==1.12.0
+soupsieve==1.9.4
+urllib3==1.25.6
+voluptuous==0.11.7
+wcwidth==0.1.7
+zipp==0.6.0
diff --git a/rules/bio_content.py b/rules/bio_content.py
new file mode 100644
index 0000000..4b8d486
--- /dev/null
+++ b/rules/bio_content.py
@@ -0,0 +1,26 @@
+import re
+
+from judge import Rule
+
+from schemas import RegexBlockingRule
+
+class BioContentRule(Rule):
+ def __init__(self, raw_config):
+ config = RegexBlockingRule(raw_config)
+ Rule.__init__(self, **config)
+ self.blocked = config['blocked']
+ def test_report(self, report: dict):
+ """
+ Test if the target account's bio text or fields matches any of the given blocked regexes.
+ """
+ acct = report['target_account']['account']
+ for regex in self.blocked:
+ if re.search(regex, acct.get('note')):
+ return True
+ for field in acct.get('fields'):
+ if re.search(regex, field.get('value')):
+ return True
+
+ return False
+
+rule = BioContentRule
diff --git a/rules/link_content.py b/rules/link_content.py
index cb5b190..d6a77bb 100644
--- a/rules/link_content.py
+++ b/rules/link_content.py
@@ -1,20 +1,26 @@
import re
-from core import Rule, Report
+from judge import Rule
+from util import parse_links_from_statuses
+
+from schemas import RegexBlockingRule
class LinkContentRule(Rule):
"""
A rule which checks for banned link content.
"""
- def __init__(self, config):
- Rule.__init__(self, config)
- self.blocked = config['blocked']
- def test(self, report: Report):
+ def __init__(self, raw_config):
+ config = RegexBlockingRule(raw_config)
+ Rule.__init__(self, **config)
+ self.blocked = []
+ for regex in config['blocked']:
+ self.blocked.append(re.compile(regex))
+ def test_report(self, report: dict):
"""
Test if a post's links matches any of the given blocked regexes.
"""
- for link in report.links:
+ for link in parse_links_from_statuses(report['statuses']):
for regex in self.blocked:
- if re.search(regex, link):
+ if regex.search(link):
return True
return False
diff --git a/rules/link_resolver.py b/rules/link_resolver.py
index 3f8b1e6..1b3702f 100644
--- a/rules/link_resolver.py
+++ b/rules/link_resolver.py
@@ -1,10 +1,14 @@
import re
import requests
-from core import Rule, Report
+from judge import Rule
from constants import VERSION
+from util import parse_links_from_statuses
+
+from schemas import RegexBlockingRule
# HTTP headers for the LinkResolverRule.
# Certain URL shorteners require us to set a valid non-generic user agent.
+# Also, it's just good practice.
HEADERS = {
"User-Agent": "Mozilla/5.0 IvoryAutomod/" + VERSION
}
@@ -14,11 +18,12 @@ class LinkResolverRule(Rule):
A rule which checks for banned links, resolving links to prevent shorturl
mitigation.
"""
- def __init__(self, config):
- Rule.__init__(self, config)
+ def __init__(self, raw_config):
+ config = RegexBlockingRule(raw_config)
+ Rule.__init__(self, **config)
self.blocked = config['blocked']
- def test(self, report: Report):
- for link in report.links:
+ def test_report(self, report: dict):
+ for link in parse_links_from_statuses(report['statuses']):
response = requests.head(link, allow_redirects=True, headers=HEADERS)
resolved_url = response.url
for regex in self.blocked:
diff --git a/rules/message_content.py b/rules/message_content.py
index ae944a4..8982367 100644
--- a/rules/message_content.py
+++ b/rules/message_content.py
@@ -1,19 +1,33 @@
import re
-from core import Rule, Report
+from judge import Rule
+
+from schemas import RegexBlockingRule
class MessageContentRule(Rule):
- def __init__(self, config):
- Rule.__init__(self, config)
+ def __init__(self, raw_config):
+ config = RegexBlockingRule(raw_config)
+ Rule.__init__(self, **config)
self.blocked = config['blocked']
- def test(self, report: Report):
+ def test_report(self, report: dict):
"""
- Test if a post matches any of the given blocked regexes.
+ Test if a status matches any of the given blocked regexes.
"""
- for post in report.posts:
+ for status in report.get('statuses', []):
for regex in self.blocked:
- if re.search(regex, post):
+ if re.search(regex, status.get('content', '')):
return True
return False
+ def test_pending_account(self, account: dict):
+ """
+ Test if a pending account's join reason matches any of the blocked
+ regexes.
+ """
+ if 'invite_request' not in account:
+ return False # can't violate this rule if you don't have a pending blurb :rollsafe:
+ for regex in self.blocked:
+ if re.search(regex, str(account.get('invite_request'))):
+ return True
+ return False
rule = MessageContentRule
diff --git a/rules/stopforumspam.py b/rules/stopforumspam.py
new file mode 100644
index 0000000..0d20198
--- /dev/null
+++ b/rules/stopforumspam.py
@@ -0,0 +1,77 @@
+from judge import Rule
+import requests
+import logging
+
+import schemas
+from voluptuous import Required, Range
+
+Config = schemas.Rule.extend({
+ Required("threshold"): Range(min=0, max=100)
+})
+
+class StopForumSpamRule(Rule):
+ """
+ A rule which pings StopForumSpam's API to see if a user is a reported
+ spammer.
+ """
+ def __init__(self, raw_config):
+ # Validate configuration
+ config = Config(raw_config)
+ Rule.__init__(self, **config)
+ # Caching for known emails/IPs
+ self.email_confidences = {}
+ self.tested_emails = set()
+ self.ip_confidences = {}
+ self.tested_ips = set()
+ self.threshold = config['threshold']
+
+ def calc_confidence(self, ip_confidence, email_confidence):
+ if not email_confidence and not ip_confidence:
+ return False
+ elif email_confidence and not ip_confidence:
+ return email_confidence >= self.threshold
+ elif ip_confidence and not email_confidence:
+ return ip_confidence >= self.threshold
+ else:
+ return max([email_confidence, ip_confidence]) >= self.threshold
+
+ def test_pending_account(self, account: dict):
+ """
+ Test if the reported user's email is listed in StopForumSpam's
+ database.
+
+ See https://www.stopforumspam.com/usage for how we're interfacing with
+ the API here.
+ """
+ email = account['email']
+ ip = account['ip']
+ if email in self.tested_emails and ip in self.tested_ips:
+ self._logger.debug("looks like we've already tested for this user and ip; recalculating to be sure")
+ judgement = self.calc_confidence(self.ip_confidences.get(ip), self.email_confidences.get(email))
+ return judgement
+
+ params = {
+ "email": email,
+ "ip": ip,
+ "json": ''
+ }
+ resp = requests.get("https://api.stopforumspam.org/api", params=params)
+ results = resp.json()
+
+ ip_confidence = results.get('ip').get('confidence')
+ email_confidence = results.get('email').get('confidence')
+ if ip_confidence:
+ self.ip_confidences[ip] = ip_confidence
+ if email_confidence:
+ self.email_confidences[email] = email_confidence
+ self.tested_emails.add(email)
+ self.tested_ips.add(ip)
+
+ judgement = self.calc_confidence(ip_confidence, email_confidence)
+ self._logger.debug("ip confidence {}".format(ip_confidence))
+ self._logger.debug("email confidence {}".format(email_confidence))
+ self._logger.debug("judgement: {}".format(judgement))
+
+ return judgement
+
+rule = StopForumSpamRule
diff --git a/rules/username_content.py b/rules/username_content.py
index 251199a..01e05f9 100644
--- a/rules/username_content.py
+++ b/rules/username_content.py
@@ -1,16 +1,27 @@
import re
+from judge import Rule
-from core import Rule, Report
+from schemas import RegexBlockingRule
class UsernameContentRule(Rule):
- def __init__(self, config):
- Rule.__init__(self, config)
+ def __init__(self, raw_config):
+ config = RegexBlockingRule(raw_config)
+ Rule.__init__(self, **config)
self.blocked = config['blocked']
- def test(self, report: Report):
+ def test_report(self, report: dict):
"""
- Test if the reported user matches any of the blocked regexes."
+ Test if the reported user matches any of the blocked regexes.
"""
- username = report.reported.username
+ username = report['target_account']['account']['username']
+ for regex in self.blocked:
+ if re.search(regex, username):
+ return True
+ return False
+ def test_pending_account(self, account: dict):
+ """
+ Test if the pending user's username matches any of the blocked regexes.
+ """
+ username = account['username']
for regex in self.blocked:
if re.search(regex, username):
return True
diff --git a/schemas.py b/schemas.py
new file mode 100644
index 0000000..f6760a3
--- /dev/null
+++ b/schemas.py
@@ -0,0 +1,61 @@
+"""
+Config schemas used in Ivory and its rules.
+"""
+from voluptuous import Schema, Required, Any, Url, ALLOW_EXTRA
+import constants
+
+ReportPunishment = Schema({
+ Required("type"): Any(
+ constants.PUNISH_WARN,
+ constants.PUNISH_REJECT,
+ constants.PUNISH_DISABLE,
+ constants.PUNISH_SILENCE,
+ constants.PUNISH_SUSPEND
+ )
+}, extra=ALLOW_EXTRA)
+
+PendingAcctPunishment = Schema({
+ Required("type"): Any(
+ constants.PUNISH_REJECT
+ )
+}, extra=ALLOW_EXTRA)
+
+Rule = Schema({
+ Required("name"): str,
+ Required("type"): str,
+ Required("severity"): int,
+}, extra=ALLOW_EXTRA)
+
+ReportRule = Rule.extend({
+ Required("punishment"): ReportPunishment
+})
+
+PendingAcctRule = Rule.extend({
+ Required("punishment"): PendingAcctPunishment
+})
+
+Reports = Schema({
+ Required("rules"): [ReportRule]
+})
+
+PendingAccounts = Schema({
+ Required("rules"): [PendingAcctRule]
+})
+
+IvoryConfig = Schema({
+ Required("token"): str,
+ # I know I should be using Url() here but it didn't work and I'm tired
+ Required("instanceURL"): str,
+ "waitTime": int,
+ "dryRun": bool,
+ "logLevel": Any("CRITICAL", "ERROR", "WARNING", "INFO", "DEBUG"),
+ "reports": Reports,
+ "pendingAccounts": PendingAccounts
+})
+
+# Schemas used by several rules
+
+RegexBlockingRule = Rule.extend({
+ Required("blocked"): [str]
+})
+
diff --git a/tests/conftest.py b/tests/conftest.py
new file mode 100644
index 0000000..b7924a2
--- /dev/null
+++ b/tests/conftest.py
@@ -0,0 +1,186 @@
+import pytest
+import requests
+
+@pytest.fixture(autouse=True)
+def disable_requests(monkeypatch):
+ """
+ Prevent Requests from making any actual network requests.
+ (This also prevents Mastodon.py from doing anything, as it uses requests as
+ well.)
+ """
+ monkeypatch.delattr("requests.sessions.Session.request")
+
+@pytest.fixture
+def MockResponse():
+ """
+ Returns an extendable class that resembles the Requests response class.
+ """
+ class _MockResponse:
+ def __init__(self, **kwargs):
+ self.jsonresponse = kwargs.get("json")
+ self.url = kwargs.get("url")
+ def json(self):
+ return self.jsonresponse
+ return _MockResponse
+
+@pytest.fixture
+def account_field():
+ def _account_field(**kwargs):
+ field = {
+ "name": kwargs.get("name", "name"),
+ "value": kwargs.get("value", "value"),
+ }
+ if kwargs.get('verified') is True:
+ field["verified_at"] = "2019-01-01T00:00:00.000Z"
+ return field
+ return _account_field
+
+
+@pytest.fixture
+def account(account_field):
+ def _account(**kwargs):
+ fields = []
+ for cfg in kwargs.get("fields", []):
+ fields.append(account_field(**cfg))
+ return {
+ "id": kwargs.get("account_id", "1"),
+ "username": kwargs.get("username", "fakeuser"),
+ "acct": kwargs.get("username", "fakeuser"),
+ "display_name": kwargs.get("username", "fakeuser"),
+ "locked": False,
+ "bot": False,
+ "created_at": "2019-01-01T00:00:00.000Z",
+ "note": kwargs.get("note", ""),
+ "url": "https://example.com/@user",
+ "avatar": "https://cdn.example.com/testavatar.png",
+ "avatar_static": "https://cdn.example.com/testheader.png",
+ "header": "https://cdn.example.com/testheader.png",
+ "header_static": "https://cdn.example.com/testheader.png",
+ "followers_count": 100,
+ "following_count": 100,
+ "statuses_count": 100,
+ "last_status_at": "2019-01-01T00:00:00.000Z",
+ "emojis": [],
+ "fields": fields
+ }
+ return _account
+
+
+@pytest.fixture
+def pending_account(account):
+ def _pending_account(**kwargs):
+ return {
+ "id": kwargs.get("account_id", "1"),
+ "username": kwargs.get("username", "fakeuser"),
+ "domain": None,
+ "created_at": "2019-01-01T00:00:00.000Z",
+ "email": kwargs.get("email", "testuser@example.com"),
+ "ip": kwargs.get("ip", "127.0.0.1"),
+ "role": "user",
+ "confirmed": True,
+ "suspended": False,
+ "silenced": False,
+ "disabled": False,
+ "approved": False,
+ "locale": "en",
+ "invite_request": kwargs.get("message", "Test message."),
+ "account": account(**kwargs.get("account", {}))
+ }
+ return _pending_account
+
+
+@pytest.fixture
+def admin_account(account):
+ def _admin_account(**kwargs):
+ return {
+ "id": kwargs.get("account_id", "1"),
+ "username": kwargs.get("username", "fakeuser"),
+ "domain": kwargs.get("domain"),
+ "created_at": "2019-01-01T00:00:00.000Z",
+ "email": kwargs.get("email", "testuser@example.com"),
+ "ip": kwargs.get("ip", "127.0.0.1"),
+ "role": "user",
+ "confirmed": True,
+ "suspended": False,
+ "silenced": False,
+ "disabled": False,
+ "approved": True,
+ "locale": "en",
+ "invite_request": None,
+ "account": account(**kwargs.get("account", {}))
+ }
+ return _admin_account
+
+
+@pytest.fixture
+def report(account, admin_account, status):
+ def _report(**kwargs):
+ reporter = admin_account(**kwargs.get("reporter", {}))
+ reported = admin_account(**kwargs.get("reported", {}))
+ statuses = []
+ for cfg in kwargs.get("statuses", []):
+ statuses.append(status(**cfg))
+ return {
+ "id": kwargs.get("report_id", "1"),
+ "action_taken": False,
+ "comment": kwargs.get("comment", ""),
+ "created_at": "2019-01-01T00:00:00.000Z",
+ "updated_at": "2019-01-01T00:00:00.000Z",
+ "account": reporter,
+ "target_account": reported,
+ "assigned_account": None,
+ "action_taken_by_account": None,
+ "statuses": statuses
+ }
+ return _report
+
+@pytest.fixture
+def status_tag():
+ def _status_tag(name):
+ return {
+ "name": name,
+ "url": "https://example.com/tags/{}".format(name)
+ }
+ return _status_tag
+
+@pytest.fixture
+def status(account, status_tag):
+ def _status(**kwargs):
+ tags = []
+ for tagname in kwargs.get("tags", []):
+ tags.append(status_tag(tagname))
+ return {
+ "id": kwargs.get("status_id", "1"),
+ "created_at": "2019-01-01T00:00:00.000Z",
+ "in_reply_to_id": kwargs.get("reply_id", "1"),
+ "in_reply_to_account_id": kwargs.get("replying_to_id", "1"),
+ "sensitive": bool(kwargs.get("spoiler_text", None)),
+ "spoiler_text": kwargs.get("spoiler_text", None),
+ "visibility": "public",
+ "language": "en",
+ "uri": "https://example.com/users/testuser/statuses/1",
+ "url": "https://example.com/@testuser/1",
+ "replies_count": 1,
+ "reblogs_count": 0,
+ "favourites_count": 0,
+ "favourited": False,
+ "reblogged": False,
+ "muted": False,
+ "content": kwargs.get("content", "Test post.
"),
+ "reblog": None,
+ "account": account(**kwargs.get("author", {})),
+ "media_attachments": [],
+ "mentions": [
+ {
+ "id": "1",
+ "username": "fakeuser",
+ "url": "https://example.com/@testuser",
+ "acct": "fakeuser@example.com"
+ }
+ ],
+ "tags": [{"name": "hashtag", "url": "https://example.com/tags/hashtag"}],
+ "emojis": [],
+ "card": None,
+ "poll": None
+ }
+ return _status
diff --git a/tests/test_bio_content.py b/tests/test_bio_content.py
new file mode 100644
index 0000000..2ff7f61
--- /dev/null
+++ b/tests/test_bio_content.py
@@ -0,0 +1,75 @@
+import pytest
+import voluptuous
+from copy import deepcopy
+
+from rules.bio_content import rule as Rule
+
+ruleconfig = {
+ "name": "Test rule",
+ "type": "test_type",
+ # this second one will block "slur" but not "slurp".
+ # just gotta be sure regex is working
+ "blocked": ["badword", "slur[^p]"],
+ "severity": 1,
+ "punishment": {
+ "type": "suspend"
+ }
+}
+
+
+@pytest.fixture
+def rule():
+ return Rule(ruleconfig)
+
+
+def test_requires_blocked():
+ bad_config = deepcopy(ruleconfig)
+ bad_config.pop("blocked")
+ # bad config no have block. no worky :(
+ with pytest.raises(voluptuous.error.MultipleInvalid):
+ Rule(bad_config)
+
+
+def test_evil_report(rule, report):
+ rpt1 = report(
+ reported={
+ "account": {
+ "note": "slur!" # needs the extra char on the end for the regex hehe
+ }
+ }
+ )
+ rpt2 = report(
+ reported={
+ "account": {
+ "fields": [
+ {
+ "name": "normal field",
+ "value": "just passing through here,,,,,"
+ }, {
+ "name": "bad field",
+ "value": "badword"
+ }
+ ]
+ }
+ }
+ )
+ # test that it's checking the notes
+ assert rule.test_report(rpt1)
+ # test that it's checking the fields
+ assert rule.test_report(rpt2)
+
+
+def test_good_report(rule, report):
+ rpt = report(
+ reported={
+ "username": "babword",
+ "note": "slurpy boi",
+ "fields": [
+ {
+ "name": "website",
+ "value": "slurpzo.ne"
+ }
+ ]
+ }
+ )
+ assert not rule.test_report(rpt)
diff --git a/tests/test_ivory.py b/tests/test_ivory.py
new file mode 100644
index 0000000..a610012
--- /dev/null
+++ b/tests/test_ivory.py
@@ -0,0 +1,142 @@
+import pytest
+import ivory
+import time
+
+ivoryconfig = {
+ "token": "testtoken",
+ "instanceURL": "https://testinstance.local",
+ "waitTime": 300,
+ "logLevel": "DEBUG",
+ "reports": {
+ "rules": [
+ {
+ "name": "No bad usernames",
+ "type": "username_content",
+ "blocked": ["badword"],
+ "severity": 1,
+ "punishment": {
+ "type": "suspend",
+ "message": "Your account has been suspended for spamming."
+ }
+ },
+ {
+ "name": "No badwords",
+ "type": "message_content",
+ "blocked": ["badword"],
+ "severity": 1,
+ "punishment": {
+ "type": "disable",
+ "message": "Your account has been disabled for having a badword in a message."
+ }
+ }
+ ]
+ },
+ "pendingAccounts": {
+ "rules": [
+ {
+ "name": "No probbox spammers",
+ "type": "message_content",
+ "blocked": ["badword", "slur[^p]"],
+ "severity": 1,
+ "punishment": {
+ "type": "reject"
+ }
+ },
+ {
+ "name": "No mewkid spammers",
+ "type": "username_content",
+ "blocked": ["badword", "slur[^p]"],
+ "severity": 1,
+ "punishment": {
+ "type": "reject"
+ }
+ }
+ ]
+ }
+}
+
+@pytest.fixture
+def generate_mockstodon(monkeypatch):
+ """
+ Replace Mastodon.py completely with a custom copy that replicates the functions Ivory uses.
+
+ I hate having to do this but I don't know if there's really a better way to
+ test without doing this...
+ """
+ def _generate_mockstodon(**kwargs):
+ class Mockstodon():
+ # Static values we check in the tests
+ # these are 4-tuples in the form of:
+ # (account id, action, report id, message)
+ moderation_actions = []
+ accounts = []
+ reports = []
+ def __init__(self, **kwargs):
+ # We don't actually use these values internally, we just want to make sure Mastodon.py is getting passed the right stuff
+ assert kwargs.get("access_token") == ivoryconfig['token']
+ assert kwargs.get("api_base_url") == ivoryconfig['instanceURL']
+ def verify_minimum_version(self, version):
+ return True
+ def instance(self):
+ return {
+ "uri": ivoryconfig['instanceURL'],
+ }
+ def account_verify_credentials(self):
+ return {
+ "username": "testuser"
+ }
+ def admin_reports(self):
+ return self.reports
+ def admin_accounts(self, **kwargs):
+ if kwargs.get("status") == "pending":
+ return self.accounts
+ else:
+ assert kwargs.get("status") == "pending"
+ def admin_account_reject(self, acct_id):
+ self.moderation_actions.append((acct_id, "reject", None, None))
+ return
+ def admin_account_moderate(self, acct_id, action, report_id, **kwargs):
+ self.moderation_actions.append((acct_id,action,report_id, kwargs.get("message")))
+ return
+ Mockstodon.accounts = kwargs.get("accounts")
+ Mockstodon.reports = kwargs.get("reports")
+ monkeypatch.setattr(ivory, "Mastodon", Mockstodon)
+ return Mockstodon
+ return _generate_mockstodon
+
+def test_oneshot(generate_mockstodon, report, pending_account):
+ """
+ Verify that Ivory runs.
+ """
+ Mockstodon = generate_mockstodon(reports=[
+ # suspend
+ report(reported={
+ "username": "badword",
+ "account": {
+ "username": "badword"
+ }
+ }),
+ # disable
+ report(statuses=[
+ {
+ "content": "badword"
+ }
+ ]),
+ # clean
+ report()
+ ], accounts=[
+ # clean
+ pending_account(),
+ # reject
+ pending_account(username="badword"),
+ # reject
+ pending_account(message="badword")
+ ])
+ i = ivory.Ivory(ivoryconfig)
+ i.run()
+ assert Mockstodon.moderation_actions == [
+ ('1', 'suspend', '1', None),
+ ('1', 'disable', '1', None),
+ ('1', 'reject', None, None),
+ ('1', 'reject', None, None)
+ ]
diff --git a/tests/test_judge.py b/tests/test_judge.py
new file mode 100644
index 0000000..8a84d97
--- /dev/null
+++ b/tests/test_judge.py
@@ -0,0 +1,164 @@
+import pytest
+from judge import ReportJudge, PendingAccountJudge
+from rules.message_content import Rule as MessageContentRule
+from rules.username_content import Rule as UsernameContentRule
+
+@pytest.fixture
+def pendingjudge():
+ return PendingAccountJudge([
+ {
+ "name": "Rule 1",
+ "type": "username_content",
+ "blocked": ["evilusername", "malicious", "asshole"],
+ "severity": 1,
+ "punishment": {
+ "type": "silence"
+ }
+ },
+ {
+ "name": "Rule 2",
+ "type": "message_content",
+ "blocked": ["heck", "badword", "slur[^p]"],
+ "severity": 5,
+ "punishment": {
+ "type": "suspend"
+ }
+ }
+ ])
+
+
+@pytest.fixture
+def reportjudge():
+ return ReportJudge([
+ {
+ "name": "Rule 1",
+ "type": "username_content",
+ "blocked": ["evilusername", "malicious", "asshole"],
+ "severity": 1,
+ "punishment": {
+ "type": "silence"
+ }
+ },
+ {
+ "name": "Rule 2",
+ "type": "message_content",
+ "blocked": ["heck", "badword", "slur[^p]"],
+ "severity": 5,
+ "punishment": {
+ "type": "suspend"
+ }
+ }
+ ])
+
+
+def test_report_empty(reportjudge, report):
+ # dummy report (always passes)
+ rpt = report()
+ (punishment, rules_broken) = reportjudge.make_judgement(rpt)
+ assert len(rules_broken) == 0
+ assert punishment == None
+
+def test_report_rule1(reportjudge, report):
+ # bad username (silence)
+ rpt = report(
+ reported={
+ "username": "evilusername",
+ "account": {
+ "username": "evilusername"
+ }
+ }
+ )
+ (punishment, rules_broken) = reportjudge.make_judgement(rpt)
+ assert len(rules_broken) == 1
+ for first_item in rules_broken:
+ assert first_item.name == "Rule 1"
+ assert isinstance(first_item, UsernameContentRule)
+ assert punishment.type == "silence"
+
+def test_report_rule2(reportjudge, report):
+ # bad message (suspend)
+ rpt = report(
+ statuses=[
+ {
+ "content":
+ "this post doesnt have anything objectionable in it haha
"
+ },
+ {
+ "content":
+ "this post contains a badword ban me pls
"
+ }
+ ]
+ )
+ (punishment, rules_broken) = reportjudge.make_judgement(rpt)
+ assert len(rules_broken) == 1
+ for first_item in rules_broken:
+ assert first_item.name == "Rule 2"
+ assert isinstance(first_item, MessageContentRule)
+ assert punishment.type == "suspend"
+
+def test_report_multirule(reportjudge, report):
+ # multiple rule breaks (highest severity takes precendence)
+ rpt = report(
+ reported={
+ "username": "evilusername",
+ "account": {
+ "username": "evilusername"
+ }
+ },
+ statuses=[
+ {
+ "content":
+ "this post doesnt have anything objectionable in it haha
"
+ },
+ {
+ "content":
+ "this post contains a badword ban me pls
"
+ }
+ ]
+ )
+ (punishment, rules_broken) = reportjudge.make_judgement(rpt)
+ assert len(rules_broken) == 2
+ for rule in rules_broken:
+ assert rule.name in ["Rule 1", "Rule 2"]
+ assert isinstance(rule, (MessageContentRule, UsernameContentRule))
+ assert punishment.type == "suspend"
+
+def test_pending_empty(pendingjudge, pending_account):
+ # dummy pending account (always passes)
+ acct = pending_account()
+ (punishment, rules_broken) = pendingjudge.make_judgement(acct)
+ assert len(rules_broken) == 0
+ assert punishment == None
+
+def test_pending_rule1(pendingjudge, pending_account):
+ # bad username (silence)
+ acct = pending_account(username="evilusername")
+ (punishment, rules_broken) = pendingjudge.make_judgement(acct)
+ assert len(rules_broken) == 1
+ for first_item in rules_broken:
+ assert first_item.name == "Rule 1"
+ assert isinstance(first_item, UsernameContentRule)
+ assert punishment.type == "silence"
+
+def test_pending_rule2(pendingjudge, pending_account):
+ # bad message (suspend)
+ acct = pending_account(message="this post contains a badword ban me pls
")
+ (punishment, rules_broken) = pendingjudge.make_judgement(acct)
+ assert len(rules_broken) == 1
+ for first_item in rules_broken:
+ assert first_item.name == "Rule 2"
+ assert isinstance(first_item, MessageContentRule)
+ assert punishment.type == "suspend"
+
+def test_pending_multirule(pendingjudge, pending_account):
+ # multiple rule breaks (highest severity takes precendence)
+ acct = pending_account(
+ username="evilusername",
+ message="this post contains a badword ban me pls
"
+ )
+ (punishment, rules_broken) = pendingjudge.make_judgement(acct)
+ assert len(rules_broken) == 2
+ for rule in rules_broken:
+ assert rule.name in ["Rule 1", "Rule 2"]
+ assert isinstance(rule, (MessageContentRule, UsernameContentRule))
+ assert punishment.type == "suspend"
diff --git a/tests/test_link_content.py b/tests/test_link_content.py
new file mode 100644
index 0000000..91673f2
--- /dev/null
+++ b/tests/test_link_content.py
@@ -0,0 +1,73 @@
+import pytest
+import voluptuous
+from copy import deepcopy
+
+from rules.link_content import rule as Rule
+
+ruleconfig = {
+ "name": "Test rule",
+ "type": "test_type",
+ "blocked": ["evilsi\\.te", "heresa\\.porn\\.domain"],
+ "severity": 1,
+ "punishment": {
+ "type": "suspend"
+ }
+}
+
+
+@pytest.fixture
+def rule():
+ return Rule(ruleconfig)
+
+
+def test_requires_blocked():
+ bad_config = deepcopy(ruleconfig)
+ bad_config.pop("blocked")
+ # bad config no have block. no worky :(
+ with pytest.raises(voluptuous.error.MultipleInvalid):
+ Rule(bad_config)
+
+
+def test_evil_report(rule, report):
+ rpt1 = report(
+ statuses=[
+ {
+ "content":
+ 'this post doesnt have any objectionable links in it haha
'
+ },
+ {
+ "content":
+ 'this post contains an bad link ban me pls
'
+ }
+ ]
+ )
+ rpt2 = report(
+ statuses=[
+ {
+ "content":
+ "this post doesnt have any links at all in it haha
"
+ },
+ {
+ "content":
+ 'gottem
'
+ }
+ ]
+ )
+ assert rule.test_report(rpt1)
+ assert rule.test_report(rpt2)
+
+
+def test_good_report(rule, report):
+ rpt = report(
+ statuses=[
+ {
+ "content":
+ "blah blah blah asdjhlg
"
+ },
+ {
+ "content":
+ 'check my mixtape yeet
'
+ }
+ ]
+ )
+ assert not rule.test_report(rpt)
diff --git a/tests/test_link_resolver.py b/tests/test_link_resolver.py
new file mode 100644
index 0000000..e565384
--- /dev/null
+++ b/tests/test_link_resolver.py
@@ -0,0 +1,81 @@
+import pytest
+import voluptuous
+import requests
+from copy import deepcopy
+
+from rules.link_resolver import rule as Rule
+
+ruleconfig = {
+ "name": "Test rule",
+ "type": "test_type",
+ "blocked": ["evilsi\\.te", "heresa\\.porn\\.domain"],
+ "severity": 1,
+ "punishment": {
+ "type": "suspend"
+ }
+}
+
+
+@pytest.fixture
+def rule():
+ return Rule(ruleconfig)
+
+@pytest.fixture
+def head_mock(MockResponse, monkeypatch):
+ def handler(url, *args, **kwargs):
+ respmap = {
+ "https://example.com/archive/notmaliciousatall/": "https://evilsi.te", # malicious site
+ "https://example.com/archive/definitelynotporn/": "https://heresa.porn.domain", # malicious site
+ "https://example.com/archive/actuallynotmalicious/": "https://example.com" # non-malicious site
+ }
+ return MockResponse(url=respmap[url])
+ monkeypatch.setattr(requests, "head", handler)
+
+
+def test_requires_blocked():
+ bad_config = deepcopy(ruleconfig)
+ bad_config.pop("blocked")
+ # bad config no have block. no worky :(
+ with pytest.raises(voluptuous.error.MultipleInvalid):
+ Rule(bad_config)
+
+def test_evil_report(head_mock, rule, report):
+ # multiple statuses with one malicious
+ rpt1 = report(
+ statuses=[
+ {
+ "content":
+ "blah blah blah asdjhlg
"
+ },
+ {
+ "content":
+ 'heres an inconspicuous archive link
'
+ }
+ ]
+ )
+ assert rule.test_report(rpt1)
+ rpt2 = report(
+ statuses=[
+ {
+ "content":
+ 'yeah uhh im up to no good lol
'
+ }
+ ]
+ )
+ assert rule.test_report(rpt2)
+
+def test_good_report(head_mock, rule, report):
+ # multiple statuses with one malicious
+ rpt = report(
+ statuses=[
+ {
+ "content":
+ "blah blah blah asdjhlg
"
+ },
+ {
+ "content":
+ 'heres an actually ok archive link
'
+ }
+ ]
+ )
+ assert not rule.test_report(rpt)
diff --git a/tests/test_message_content.py b/tests/test_message_content.py
new file mode 100644
index 0000000..9a176c5
--- /dev/null
+++ b/tests/test_message_content.py
@@ -0,0 +1,85 @@
+import pytest
+import voluptuous
+from copy import deepcopy
+
+from rules.message_content import rule as Rule
+
+ruleconfig = {
+ "name": "Test rule",
+ "type": "test_type",
+ # this second one will block "slur" but not "slurp".
+ # just gotta be sure regex is working
+ "blocked": ["badword", "slur[^p]"],
+ "severity": 1,
+ "punishment": {
+ "type": "suspend"
+ }
+}
+
+
+@pytest.fixture
+def rule():
+ return Rule(ruleconfig)
+
+
+def test_requires_blocked():
+ bad_config = deepcopy(ruleconfig)
+ bad_config.pop("blocked")
+ # bad config no have block. no worky :(
+ with pytest.raises(voluptuous.error.MultipleInvalid):
+ Rule(bad_config)
+
+
+def test_evil_report(rule, report):
+ rpt1 = report(
+ statuses=[
+ {
+ "content":
+ "this post doesnt have anything objectionable in it haha
"
+ },
+ {
+ "content":
+ "this post contains a badword ban me pls
"
+ }
+ ]
+ )
+ rpt2 = report(
+ statuses=[
+ {
+ "content":
+ "this post doesnt have anything objectionable in it haha
"
+ },
+ {
+ "content":
+ "slurslurslurslurslur
"
+ }
+ ]
+ )
+ assert rule.test_report(rpt1)
+ assert rule.test_report(rpt2)
+
+def test_good_report(rule, report):
+ rpt = report(
+ statuses=[
+ {
+ "content":
+ "blah blah slurp asdjhlg
"
+ },
+ {
+ "content":
+ "im just fuckibg POSTING,,,
"
+ }
+ ]
+ )
+ assert not rule.test_report(rpt)
+
+def test_evil_pending_account(rule, pending_account):
+ acct1 = pending_account(message="lol slur slur slur you'll still let me in do it u wont")
+ assert rule.test_pending_account(acct1)
+ acct2 = pending_account(message="buy badword penis pills :joylmfao:")
+ assert rule.test_pending_account(acct2)
+
+def test_good_pending_account(rule, pending_account):
+ acct = pending_account(message="slurpeedrinker26") # of no relation to slurpeedrinker28
+ assert not rule.test_pending_account(acct)
+
diff --git a/tests/test_stopforumspam.py b/tests/test_stopforumspam.py
new file mode 100644
index 0000000..1a070e1
--- /dev/null
+++ b/tests/test_stopforumspam.py
@@ -0,0 +1,94 @@
+import pytest
+import voluptuous
+import requests # for mocking
+from copy import deepcopy
+
+from rules.stopforumspam import rule as Rule
+
+ruleconfig = {
+ "name": "StopForumSpam test",
+ "type": "stopforumspam",
+ "threshold": 90,
+ "severity": 1,
+ "punishment": {
+ "type": "reject"
+ }
+ }
+
+
+@pytest.fixture
+def rule():
+ return Rule(ruleconfig)
+
+@pytest.fixture
+def sfs_response():
+ def _sfs_response(ip_conf=None, email_conf=None):
+ # The rule currently only uses confidence internally, but we
+ # simulate a full response just in case
+ resp = {
+ "success": 1,
+ "ip": {
+ "frequency": 10,
+ "appears": 1
+ },
+ "email": {
+ "frequency": 10,
+ "appears": 1
+ }
+ }
+ if ip_conf:
+ resp['ip']['confidence'] = ip_conf
+ if email_conf:
+ resp['email']['confidence'] = email_conf
+ return resp
+ return _sfs_response
+
+@pytest.fixture
+def sfs_mock(monkeypatch, MockResponse, sfs_response):
+ """
+ Expose a way to mock the "requests" library so that we can test inputs.
+ """
+ def _sfs_mock(ip=None, email=None):
+ def handler(*args, **kwargs):
+ return MockResponse(json=sfs_response(ip, email))
+ monkeypatch.setattr(requests, "get", handler)
+ return _sfs_mock
+
+def test_requires_threshold():
+ bad_config = deepcopy(ruleconfig)
+ bad_config.pop("threshold")
+ # bad config no have threshold. no worky :(
+ with pytest.raises(voluptuous.error.MultipleInvalid):
+ Rule(bad_config)
+
+def test_evil_pending_account_iponly(rule, pending_account, sfs_mock):
+ acct = pending_account()
+ sfs_mock(ip=90)
+ assert rule.test_pending_account(acct)
+
+def test_good_pending_account_iponly(rule, pending_account, sfs_mock):
+ acct = pending_account()
+ # probably not *actually* good but not confident enough for us to act on,
+ # of course
+ sfs_mock(ip=89)
+ assert not rule.test_pending_account(acct)
+
+def test_evil_pending_account_emailonly(rule, pending_account, sfs_mock):
+ acct = pending_account()
+ sfs_mock(email=90)
+ assert rule.test_pending_account(acct)
+
+def test_good_pending_account_emailonly(rule, pending_account, sfs_mock):
+ acct = pending_account()
+ sfs_mock(email=89)
+ assert not rule.test_pending_account(acct)
+
+def test_evil_pending_account(rule, pending_account, sfs_mock):
+ acct = pending_account()
+ sfs_mock(ip=85, email=99)
+ assert rule.test_pending_account(acct)
+
+def test_good_pending_account(rule, pending_account, sfs_mock):
+ acct = pending_account()
+ sfs_mock(ip=80, email=89)
+ assert not rule.test_pending_account(acct)
diff --git a/tests/test_username_content.py b/tests/test_username_content.py
new file mode 100644
index 0000000..d82ec98
--- /dev/null
+++ b/tests/test_username_content.py
@@ -0,0 +1,77 @@
+import pytest
+import voluptuous
+from copy import deepcopy
+
+from rules.username_content import rule as Rule
+
+ruleconfig = {
+ "name": "Test rule",
+ "type": "test_type",
+ # this second one will block "slur" but not "slurp".
+ # just gotta be sure regex is working
+ "blocked": ["badword", "slur[^p]"],
+ "severity": 1,
+ "punishment": {
+ "type": "suspend"
+ }
+}
+
+
+@pytest.fixture
+def rule():
+ return Rule(ruleconfig)
+
+
+def test_requires_blocked():
+ bad_config = deepcopy(ruleconfig)
+ bad_config.pop("blocked")
+ # bad config no have block. no worky :(
+ with pytest.raises(voluptuous.error.MultipleInvalid):
+ Rule(bad_config)
+
+
+def test_evil_report(rule, report):
+ # In the rule itself we're actually using the one in the nested account
+ # object (not the admin account) but we set both here for completion's
+ # sake.
+ rpt1 = report(
+ reported={
+ "username": "slurmaster9000",
+ "account": {
+ "username": "slurmaster9000"
+ }
+ }
+ )
+ rpt2 = report(
+ reported={
+ "username": "xX_badword_Xx",
+ "account": {
+ "username": "xX_badword_Xx"
+ }
+ }
+ )
+ assert rule.test_report(rpt1)
+ assert rule.test_report(rpt2)
+
+def test_good_report(rule, report):
+ rpt = report(
+ reported={
+ # ffs corporate shill
+ "username": "slurpeedrinker28",
+ "account": {
+ "username": "slurpeedrinker28"
+ }
+ }
+ )
+ assert not rule.test_report(rpt)
+
+def test_evil_pending_account(rule, pending_account):
+ acct1 = pending_account(username="slurtroll")
+ assert rule.test_pending_account(acct1)
+ acct2 = pending_account(username="badwordyfiend")
+ assert rule.test_pending_account(acct2)
+
+def test_good_pending_account(rule, pending_account):
+ acct = pending_account(message="slurpeedrinker26") # of no relation to slurpeedrinker28
+ assert not rule.test_pending_account(acct)
+
diff --git a/util.py b/util.py
new file mode 100644
index 0000000..340bf0d
--- /dev/null
+++ b/util.py
@@ -0,0 +1,27 @@
+"""
+Utilities for Ivory operations.
+"""
+from bs4 import BeautifulSoup
+from typing import List
+
+def parse_links(text: str):
+ """
+ Parse all links out of an HTML string.
+ Currently using BeautifulSoup for this.
+
+ Used primarily when getting links out of a status or bio.
+
+ TODO: Make parsing read non- links?
+ TODO: Exclude mentions? The "mention" class is not a good way to filter that...
+ """
+ return [a.get('href') for a in BeautifulSoup(text, "html.parser").find_all('a')]
+
+def parse_links_from_statuses(statuses: List[dict]):
+ """
+ Get a list of all links out of an array of Mastodon statuses.
+ """
+ links = []
+ for status in statuses:
+ for link in parse_links(status['content']):
+ links.append(link)
+ return links