Skip to content

Commit

Permalink
fix(sts-job-manager): Logging Configuration (#860)
Browse files Browse the repository at this point in the history
* fix(sts-job-manager): Fix accessing `objectConditions`

Not all operations will have `objectConditions` - namely, if a job has been created outside of the STS Job Manager previously

* fix: logging configuration

Co-authored-by: Ludovico Magnocavallo <ludomagno@google.com>
  • Loading branch information
d-goog and ludoo authored Oct 6, 2022
1 parent b61e670 commit feedac7
Show file tree
Hide file tree
Showing 5 changed files with 20 additions and 15 deletions.
1 change: 1 addition & 0 deletions tools/sts-job-manager/lib/table_util.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
from lib.options import BigQueryOptions
from lib.services import Services

logging.basicConfig()
logger = logging.getLogger(__name__)
logger.setLevel(os.environ.get("LOGLEVEL", "INFO").upper())

Expand Down
1 change: 1 addition & 0 deletions tools/sts-job-manager/pause_all_jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
from lib.options import STSJobManagerOptions
from lib.services import Services

logging.basicConfig()
logger = logging.getLogger(__name__)
logger.setLevel(os.environ.get("LOGLEVEL", "INFO").upper())

Expand Down
1 change: 1 addition & 0 deletions tools/sts-job-manager/prepare_tables.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
from lib.services import Services
from lib.table_util import create_dataset, create_table, get_table_ref

logging.basicConfig()
logger = logging.getLogger(__name__)
logger.setLevel(os.environ.get("LOGLEVEL", "INFO").upper())

Expand Down
1 change: 1 addition & 0 deletions tools/sts-job-manager/remove_successful_one_time_jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
from constants.status import STATUS, sts_operation_status_to_table_status
from lib.services import Services

logging.basicConfig()
logger = logging.getLogger(__name__)
logger.setLevel(os.environ.get("LOGLEVEL", "INFO").upper())

Expand Down
31 changes: 16 additions & 15 deletions tools/sts-job-manager/sts_job_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
from lib.services import Services
from lib.table_util import get_table_identifier, get_table_ref

logging.basicConfig()
logger = logging.getLogger(__name__)
logger.setLevel(os.environ.get("LOGLEVEL", "INFO").upper())

Expand Down Expand Up @@ -288,7 +289,7 @@ def manage_state(services: Services, options: STSJobManagerOptions):
Gathers all prefix information from both STS and the database, then updates
the corresponding rows where necessary.
"""
logging.info('Checking state...')
logger.info('Checking state...')

# jobs from the database
jobs = get_jobs_by_prefix(services, options)
Expand Down Expand Up @@ -529,36 +530,36 @@ def num_new_jobs_to_run():
double_current_job_count = current_running_jobs * 2

if not pending_job_count:
logging.info('No jobs available to run')
logger.info('No jobs available to run')
return 0
elif current_running_jobs > options.max_concurrent_jobs:
logging.info(f'Will not create any new jobs - too many are running \
logger.info(f'Will not create any new jobs - too many are running \
(current = {current_running_jobs}, \
max = {options.max_concurrent_jobs})')
return 0
elif current_running_jobs == 0 and \
max_number_jobs_available_to_run > 0:
logging.info(
logger.info(
'Will prepare initial job, as no other jobs are running')
return 1
else:
logging.info('Ramping up job count')
logger.info('Ramping up job count')
return min(max_number_jobs_available_to_run,
double_current_job_count)

logging.info('Managing jobs...')
logger.info('Managing jobs...')

count = num_new_jobs_to_run()

if not count:
logging.info('...no new jobs to run.')
logger.info('...no new jobs to run.')
return

logging.info(f'...spinning up to {count} new job(s)...')
logger.info(f'...spinning up to {count} new job(s)...')

run_jobs(count, services, options)

logging.info('...done running jobs.')
logger.info('...done running jobs.')


def publish_heartbeat(jobs: Dict[str, Job], last_jobs: Dict[str, Job],
Expand All @@ -571,7 +572,7 @@ def publish_heartbeat(jobs: Dict[str, Job], last_jobs: Dict[str, Job],
def publish_timeseries_heartbeat(name: str, value: int, services: Services,
project_name: str,
monitoring_types=monitoring_v3.types):
logging.info(f'Preparing heartbeat for `{name}` (value: {value})...')
logger.info(f'Preparing heartbeat for `{name}` (value: {value})...')

series = monitoring_types.TimeSeries()
series.metric.type = name
Expand All @@ -582,14 +583,14 @@ def publish_timeseries_heartbeat(name: str, value: int, services: Services,

services.monitoring.create_time_series(project_name, [series])

logging.info(f'...published heartbeat `{name}`.')
logger.info(f'...published heartbeat `{name}`.')

p = options.stackdriver_project if options.stackdriver_project \
else services.bigquery.project

monitoring_project_name = services.monitoring.project_path(p)

logging.info(f'Preparing heartbeats for `{monitoring_project_name}`...')
logger.info(f'Preparing heartbeats for `{monitoring_project_name}`...')

status_count: Dict[str, int] = {}
stalled_count = 0
Expand All @@ -616,7 +617,7 @@ def publish_timeseries_heartbeat(name: str, value: int, services: Services,
name, count, services, monitoring_project_name, monitoring_types)

for job in determine_stalled_jobs(jobs, last_jobs):
logging.warn(f'Job `{job.job_name}` appears to be stalled.')
logger.warn(f'Job `{job.job_name}` appears to be stalled.')
stalled_count += 1

# Publish stalled count
Expand All @@ -625,7 +626,7 @@ def publish_timeseries_heartbeat(name: str, value: int, services: Services,
stalled_metric, stalled_count, services, monitoring_project_name,
monitoring_types)

logging.info('...done publishing heartbeats.')
logger.info('...done publishing heartbeats.')


def interval(services: Services, options: STSJobManagerOptions):
Expand All @@ -640,7 +641,7 @@ def interval(services: Services, options: STSJobManagerOptions):
jobs: Dict[str, Job] = {}

while True:
logging.info(f'Running main interval #{interval_count}...')
logger.info(f'Running main interval #{interval_count}...')
start = time.time()

job_timeout = start - last_manage_jobs >= options.job_interval
Expand Down

0 comments on commit feedac7

Please sign in to comment.