-
Notifications
You must be signed in to change notification settings - Fork 201
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Addition of Amazon ECS job, copied code from luigi.contrib.ecs.py and… #13
Open
rhaarm
wants to merge
7
commits into
Nextdoor:master
Choose a base branch
from
rhaarm:master
base: master
Could not load branches
Branch not found: {{ refName }}
Loading
Could not load tags
Nothing to show
Loading
Are you sure you want to change the base?
Some commits from the old base branch may be removed from the timeline,
and old review comments may become outdated.
Open
Changes from all commits
Commits
Show all changes
7 commits
Select commit
Hold shift + click to select a range
18c5536
Addition of Amazon ECS job, copied code from luigi.contrib.ecs.py and…
rhaarm 1ede3cb
Addition of Amazon ECS job, copied code from luigi.contrib.ecs.py and…
rhaarm 150ff2e
Addition of Amazon ECS job, copied code from luigi.contrib.ecs.py and…
rhaarm bdd50e3
Addition of Amazon ECS job, copied code from luigi.contrib.ecs.py and…
rhaarm 2be8ad3
add a 7 day window, will need to eventually build a true time picker
rhaarm 64f9ff9
add a 7 day window, will need to eventually build a true time picker
rhaarm a48ff8e
update for ecs_job better error handling
rhaarm File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,142 @@ | ||
"""A sample job that prints string.""" | ||
|
||
import time | ||
import logging | ||
import boto3 | ||
from ndscheduler import job | ||
|
||
logger = logging.getLogger(__name__) | ||
|
||
client = boto3.client('ecs') | ||
|
||
POLL_TIME = 2 | ||
|
||
|
||
class ECSFailureException(BaseException): | ||
pass | ||
|
||
|
||
class ECSResponseException(BaseException): | ||
pass | ||
|
||
|
||
class ECSJob(job.JobBase): | ||
retry_count = 3 | ||
|
||
@classmethod | ||
def meta_info(cls): | ||
return { | ||
'job_class_string': '%s.%s' % (cls.__module__, cls.__name__), | ||
'notes': 'This will execute a AWS ECS RunTask!', | ||
'arguments': [ | ||
{'type': 'string', 'description': 'ECS Cluster to run on'}, | ||
{'type': 'string', 'description': 'task_def_arn'}, | ||
{'type': 'array[dict]', 'description': 'task_def'}, | ||
{'type': 'string', 'description': 'Directly corresponds to the ' | ||
'`overrides` parameter of runTask API'} | ||
], | ||
'example_arguments': '["ClusterName", None, "arn:aws:ecs:<region>' | ||
':<user_id>:task-definition/<family>:<tag>", None]' | ||
} | ||
|
||
def _get_task_statuses(self, task_ids): | ||
""" | ||
Retrieve task statuses from ECS API | ||
|
||
Returns list of {RUNNING|PENDING|STOPPED} for each id in task_ids | ||
""" | ||
logger.debug('Get status of task_ids: {}'.format(task_ids)) | ||
response = client.describe_tasks(tasks=task_ids, cluster=self.cluster) | ||
|
||
# Error checking | ||
if response['failures']: | ||
raise ECSFailureException('There were some failures:\n{0}'.format( | ||
response['failures'])) | ||
status_code = response['ResponseMetadata']['HTTPStatusCode'] | ||
if status_code != 200: | ||
msg = 'Task status request received status code {0}:\n{1}' | ||
raise ECSResponseException(msg.format(status_code, response)) | ||
|
||
return [t['lastStatus'] for t in response['tasks']] | ||
|
||
def get_task_statuses(self, task_ids): | ||
retries = 0 | ||
while True: | ||
try: | ||
return self._get_task_statuses(task_ids) | ||
except ECSResponseException as e: | ||
if retries <= self.retry_count: | ||
msg = 'Response failed retry attempt {}/{}'.format(retries, self.retry_count) | ||
logger.warning(msg) | ||
time.sleep(POLL_TIME) | ||
else: | ||
raise | ||
|
||
def _track_tasks(self, task_ids): | ||
"""Poll task status until STOPPED""" | ||
while True: | ||
statuses = self.get_task_statuses(task_ids) | ||
|
||
if all([status == 'STOPPED' for status in statuses]): | ||
logger.info('ECS tasks {0} STOPPED'.format(','.join(task_ids))) | ||
break | ||
time.sleep(POLL_TIME) | ||
logger.debug('ECS task status for tasks {0}: {1}'.format( | ||
','.join(task_ids), statuses)) | ||
|
||
@property | ||
def cluster(self): | ||
if not hasattr(self, '_cluster'): | ||
logger.warning('Cluster not set!') | ||
return None | ||
return self._cluster | ||
|
||
@cluster.setter | ||
def cluster(self, cluster): | ||
self._cluster = cluster | ||
logger.debug('Set Cluster: {}'.format(cluster)) | ||
|
||
def run(self, cluster, task_def_arn=None, task_def=None, command=None, *args, **kwargs): | ||
self.cluster = cluster | ||
if (not task_def and not task_def_arn) or \ | ||
(task_def and task_def_arn): | ||
raise ValueError(('Either (but not both) a task_def (dict) or' | ||
'task_def_arn (string) must be assigned')) | ||
if not task_def_arn: | ||
# Register the task and get assigned taskDefinition ID (arn) | ||
response = client.register_task_definition(**task_def) | ||
task_def_arn = response['taskDefinition']['taskDefinitionArn'] | ||
logger.debug('Task Definition ARN: {}'.format(task_def_arn)) | ||
|
||
# Submit the task to AWS ECS and get assigned task ID | ||
# (list containing 1 string) | ||
if command: | ||
overrides = {'containerOverrides': command} | ||
else: | ||
overrides = {} | ||
response = client.run_task(taskDefinition=task_def_arn, | ||
overrides=overrides, cluster=self.cluster) | ||
_task_ids = [task['taskArn'] for task in response['tasks']] | ||
|
||
# Wait on task completion | ||
self._track_tasks(_task_ids) | ||
|
||
|
||
if __name__ == "__main__": | ||
# You can easily test this job here | ||
job = ECSJob.create_test_instance() | ||
job.run('ClusterName', "arn:aws:ecs:<region>:<user_id>:task-" | ||
"definition/<task_def_name>:<revision_number>") | ||
job.run('DataETLCluster', None, { | ||
'family': 'hello-world', | ||
'volumes': [], | ||
'containerDefinitions': [ | ||
{ | ||
'memory': 1, | ||
'essential': True, | ||
'name': 'hello-world', | ||
'image': 'ubuntu', | ||
'command': ['/bin/echo', 'hello world'] | ||
} | ||
] | ||
}) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Change this comment.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How do people pass AWS credential? Better add some operational guidelines here, e.g., create dot file on worker nodes for aws credential .