Skip to content

Python module for running map-reduce jobs on ECS-watchbot

License

Notifications You must be signed in to change notification settings

mapbox/watchbot-progress-py

Repository files navigation

watchbot-progress-py

Build Status Coverage Status PyPI version

watchbot-progress-py is a Python module for tracking progress of map-reduce jobs on ECS watchbot

ECS watchbot's reduce mode allows you to break up a large job into smaller parts, process them individually, and roll them up into a final result. This involves tracking the state of the job using a database. Node and command line users can use watchbot-progress for this task. This library brings similar functionality to Python users and provides a high-level interface which cleanly separates reduce mode logic from your processing code.

Usage

1. Make a list of parts

Each Part is a dictionary containing information about a portion of the job. For example, a part could represent the URL to a file to be processed. The part dict must be JSON-serializable.

parts = [
    {'url': 'https://a.com'},
    {'url': 'https://b.com'},
    {'url': 'https://c.com'}]

2. Start the job

The create_job function takes a list of parts and starts off the process:

  • Adds a partid and jobid to each part dictionary.
  • Writes the parts and metadata to the backend database which tracks the progress of the overall job.
  • Sends an SNS message with a Subject=map and with a Message: the json representation of the part dictionary including partid and jobid.
  • The jobid is returned.
from watchbot_progress import create_job

jobid = create_job(parts)

3. Process each part

In your distributed processing code, the code which receives the Subject=map SNS message, you can use the Part context manager to handle the reduce mode accounting.

  • In the case that the context block succeeds:

    • the part will be marked as complete in the backend database
    • the total remaining parts is queried
    • if there are no remaining parts, an SNS message with Subject=reduce is sent.
  • If the context block fails:

    • By default, the failure will not affect the backend database or job status; all remaining parts will be processed and external retry logic applies.
    • If you pass the optional fail_job_on parameter, you can specify a list of Exception types which will cause the job to fail. All subqeuent parts will be skipped in the event of a job failure.
    • The original exception is re-raised from the context manager
from watchbot_progress import Part

message = json.loads(os.environ['Message'])
jobid = message['jobid']
partid = message['partid']

with Part(jobid, partid):
    # The context block
    process_url(message['url'])

Backend Databases

Since version 0.5, multiple backend databases are supported.

  • Dynamodb is the default. It is suitable for jobs with relatively small part counts (less than 10,000) and easy to administer via AWS tools.
    • If the topic_arn is not specified, the SNS topic from the WorkTopic environment variable.
    • If the table_arn is not specified, the DynamoDB table will be determined from the ProgressTable environment variable.
  • Redis requires more administration but is highly performant and scales well.
    • Can specify the host, port and db for the Redis connection which defaults to localhost, 6379 and 0 respectively.
    • If the topic_arn is not specified, the SNS topic from the WorkTopic environment variable.

These backends can be used by creating an instance of the desired class and passing it as the progress argument.

from watchbot_progress.backends.redis import RedisProgress

p = RedisProgress(host='localhost', port=6379)

create_job(parts, progress=p)

# ... and on the processing side

with Part(partid, jobid, progress=p):
    process_url(message['url'])

For more information about writing a backend database, see docs/WatchbotProgress-interface.md

Command line interface

The watchbot-progress-py command is provided to check the status of jobs in a table.

$ watchbot-progress-py --help
Usage: watchbot-progress-py [OPTIONS] COMMAND [ARGS]...

  Main click command

Options:
  --help  Show this message and exit.

Commands:
  info     Returns the status of a specific jobid for a...
  ls       Scans the database for jobs and lists them as...
  pending  Streams out all pending part numbers for a...

About

Python module for running map-reduce jobs on ECS-watchbot

Topics

Resources

License

Code of conduct

Security policy

Stars

Watchers

Forks

Packages

No packages published

Contributors 4

  •  
  •  
  •  
  •  

Languages