Skip to content

Commit

Permalink
Fixes for envoy (#12392)
Browse files Browse the repository at this point in the history
* [ci] Use fully-qualified domain for in-cluster services

* fix envoy certificates to internal namespaces

* fix /foo/batch matching paths for /foo/batch-driver

* [batch] Examine the response body for error messages in tests
  • Loading branch information
daniel-goldstein authored Oct 31, 2022
1 parent 4a52af9 commit 8379a6a
Show file tree
Hide file tree
Showing 3 changed files with 51 additions and 33 deletions.
38 changes: 19 additions & 19 deletions batch/test/test_batch.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,9 @@
import time
from typing import Set

import aiohttp
import pytest

from hailtop import httpx
from hailtop.auth import service_auth_headers
from hailtop.batch_client.client import BatchClient
from hailtop.config import get_deploy_config, get_user_config
Expand Down Expand Up @@ -107,14 +107,14 @@ def test_invalid_resource_requests(client: BatchClient):
bb = client.create_batch()
resources = {'cpu': '1', 'memory': '250Gi', 'storage': '1Gi'}
bb.create_job(DOCKER_ROOT_IMAGE, ['true'], resources=resources)
with pytest.raises(aiohttp.client.ClientResponseError, match='resource requests.*unsatisfiable'):
with pytest.raises(httpx.ClientResponseError, match='resource requests.*unsatisfiable'):
bb.submit()

bb = client.create_batch()
resources = {'cpu': '0', 'memory': '1Gi', 'storage': '1Gi'}
bb.create_job(DOCKER_ROOT_IMAGE, ['true'], resources=resources)
with pytest.raises(
aiohttp.client.ClientResponseError,
httpx.ClientResponseError,
match='bad resource request for job.*cpu must be a power of two with a min of 0.25; found.*',
):
bb.submit()
Expand All @@ -123,7 +123,7 @@ def test_invalid_resource_requests(client: BatchClient):
resources = {'cpu': '0.1', 'memory': '1Gi', 'storage': '1Gi'}
bb.create_job(DOCKER_ROOT_IMAGE, ['true'], resources=resources)
with pytest.raises(
aiohttp.client.ClientResponseError,
httpx.ClientResponseError,
match='bad resource request for job.*cpu must be a power of two with a min of 0.25; found.*',
):
bb.submit()
Expand All @@ -132,21 +132,21 @@ def test_invalid_resource_requests(client: BatchClient):
resources = {'cpu': '0.25', 'memory': 'foo', 'storage': '1Gi'}
bb.create_job(DOCKER_ROOT_IMAGE, ['true'], resources=resources)
with pytest.raises(
aiohttp.client.ClientResponseError,
httpx.ClientResponseError,
match=".*.resources.memory must match regex:.*.resources.memory must be one of:.*",
):
bb.submit()

bb = client.create_batch()
resources = {'cpu': '0.25', 'memory': '500Mi', 'storage': '10000000Gi'}
bb.create_job(DOCKER_ROOT_IMAGE, ['true'], resources=resources)
with pytest.raises(aiohttp.client.ClientResponseError, match='resource requests.*unsatisfiable'):
with pytest.raises(httpx.ClientResponseError, match='resource requests.*unsatisfiable'):
bb.submit()

bb = client.create_batch()
resources = {'storage': '10000000Gi', 'machine_type': smallest_machine_type(CLOUD)}
bb.create_job(DOCKER_ROOT_IMAGE, ['true'], resources=resources)
with pytest.raises(aiohttp.client.ClientResponseError, match='resource requests.*unsatisfiable'):
with pytest.raises(httpx.ClientResponseError, match='resource requests.*unsatisfiable'):
bb.submit()


Expand Down Expand Up @@ -403,7 +403,7 @@ def test_deleted_job_log(client: BatchClient):

try:
j.log()
except aiohttp.ClientResponseError as e:
except httpx.ClientResponseError as e:
if e.status == 404:
pass
else:
Expand All @@ -419,7 +419,7 @@ def test_delete_batch(client: BatchClient):
# verify doesn't exist
try:
client.get_job(*j.id)
except aiohttp.ClientResponseError as e:
except httpx.ClientResponseError as e:
if e.status == 404:
pass
else:
Expand All @@ -443,7 +443,7 @@ def test_cancel_batch(client: BatchClient):
# cancelled job has no log
try:
j.log()
except aiohttp.ClientResponseError as e:
except httpx.ClientResponseError as e:
if e.status == 404:
pass
else:
Expand All @@ -453,7 +453,7 @@ def test_cancel_batch(client: BatchClient):
def test_get_nonexistent_job(client: BatchClient):
try:
client.get_job(1, 666)
except aiohttp.ClientResponseError as e:
except httpx.ClientResponseError as e:
if e.status == 404:
pass
else:
Expand Down Expand Up @@ -718,7 +718,7 @@ def test_duplicate_parents(client: BatchClient):
bb.create_job(DOCKER_ROOT_IMAGE, command=['echo', 'tail'], parents=[head, head])
try:
batch = bb.submit()
except aiohttp.ClientResponseError as e:
except httpx.ClientResponseError as e:
assert e.status == 400
else:
assert False, f'should receive a 400 Bad Request {batch.id}'
Expand Down Expand Up @@ -961,9 +961,9 @@ def test_verify_private_network_is_restricted(client: BatchClient):
)
try:
bb.submit()
except aiohttp.ClientResponseError as err:
except httpx.ClientResponseError as err:
assert err.status == 400
assert 'unauthorized network private' in err.message
assert 'unauthorized network private' in err.body
else:
assert False

Expand Down Expand Up @@ -1258,9 +1258,9 @@ def test_update_cancelled_batch_wout_fast_path(client: BatchClient):
for _ in range(4):
bb.create_job(DOCKER_ROOT_IMAGE, ['echo', 'a' * (900 * 1024)])
b = bb.submit()
except aiohttp.ClientResponseError as err:
except httpx.ClientResponseError as err:
assert err.status == 400
assert 'Cannot submit new jobs to a cancelled batch' in err.message
assert 'Cannot submit new jobs to a cancelled batch' in err.body
else:
assert False

Expand All @@ -1274,9 +1274,9 @@ def test_submit_update_to_cancelled_batch(client: BatchClient):
try:
bb.create_job(DOCKER_ROOT_IMAGE, ['true'])
b = bb.submit()
except aiohttp.ClientResponseError as err:
except httpx.ClientResponseError as err:
assert err.status == 400
assert 'Cannot submit new jobs to a cancelled batch' in err.message
assert 'Cannot submit new jobs to a cancelled batch' in err.body
else:
assert False

Expand All @@ -1290,7 +1290,7 @@ def test_submit_update_to_deleted_batch(client: BatchClient):
try:
bb.create_job(DOCKER_ROOT_IMAGE, ['true'])
b = bb.submit()
except aiohttp.ClientResponseError as err:
except httpx.ClientResponseError as err:
assert err.status == 404
else:
assert False
2 changes: 1 addition & 1 deletion ci/ci/ci.py
Original file line number Diff line number Diff line change
Expand Up @@ -741,7 +741,7 @@ async def on_startup(app):
kubernetes_asyncio.config.load_incluster_config()
k8s_client = kubernetes_asyncio.client.CoreV1Api()
app['task_manager'].ensure_future(periodically_call(10, update_envoy_configs, app['db'], k8s_client))
app['task_manager'].ensure_future(periodically_call(10, cleanup_expired_namespaces, app['db'], k8s_client))
app['task_manager'].ensure_future(periodically_call(10, cleanup_expired_namespaces, app['db']))


async def on_cleanup(app):
Expand Down
44 changes: 31 additions & 13 deletions ci/ci/envoy.py
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ def gateway_internal_host(services_per_namespace: Dict[str, List[str]]) -> dict:
'domains': [f'internal.{DOMAIN}'],
'routes': [
{
'match': {'prefix': f'/{namespace}/{service}'},
'match': {'path_separated_prefix': f'/{namespace}/{service}'},
'route': route_to_cluster(f'{namespace}-{service}'),
'typed_per_filter_config': {
'envoy.filters.http.local_ratelimit': rate_limit_config(),
Expand Down Expand Up @@ -144,7 +144,7 @@ def internal_gateway_internal_host(services_per_namespace: Dict[str, List[str]])
'domains': ['internal.hail'],
'routes': [
{
'match': {'prefix': f'/{namespace}/{service}'},
'match': {'path_separated_prefix': f'/{namespace}/{service}'},
'route': route_to_cluster(f'{namespace}-{service}'),
'typed_per_filter_config': {
'envoy.filters.http.local_ratelimit': rate_limit_config(),
Expand Down Expand Up @@ -209,17 +209,21 @@ def clusters(
clusters.append(browser_cluster)
clusters.append(static_cluster)
else:
clusters.append(make_cluster(service, f'{service}.default', proxy, verify_ca=True))
clusters.append(make_cluster(service, f'{service}.default.svc.cluster.local', proxy, verify_ca=True))

for namespace, services in internal_services_per_namespace.items():
for service in services:
clusters.append(make_cluster(f'{namespace}-{service}', f'{service}.{namespace}', proxy, verify_ca=False))
clusters.append(
make_cluster(
f'{namespace}-{service}', f'{service}.{namespace}.svc.cluster.local', proxy, verify_ca=False
)
)

return clusters


def make_cluster(name: str, address: str, proxy: str, *, verify_ca: bool) -> dict:
cluster = {
return {
'@type': 'type.googleapis.com/envoy.config.cluster.v3.Cluster',
'name': name,
'type': 'STRICT_DNS',
Expand All @@ -243,7 +247,13 @@ def make_cluster(name: str, address: str, proxy: str, *, verify_ca: bool) -> dic
}
],
},
'transport_socket': {
'transport_socket': upstream_transport_socket(proxy, verify_ca),
}


def upstream_transport_socket(proxy: str, verify_ca: bool) -> dict:
if verify_ca:
return {
'name': 'envoy.transport_sockets.tls',
'typed_config': {
'@type': 'type.googleapis.com/envoy.extensions.transport_sockets.tls.v3.UpstreamTlsContext',
Expand All @@ -252,17 +262,25 @@ def make_cluster(name: str, address: str, proxy: str, *, verify_ca: bool) -> dic
{
'certificate_chain': {'filename': f'/ssl-config/{proxy}-cert.pem'},
'private_key': {'filename': f'/ssl-config/{proxy}-key.pem'},
}
]
},
],
},
'validation_context': {'filename': f'/ssl-config/{proxy}-outgoing.pem'},
},
}

return {
'name': 'envoy.transport_sockets.tls',
'typed_config': {
'@type': 'type.googleapis.com/envoy.extensions.transport_sockets.tls.v3.UpstreamTlsContext',
'common_tls_context': {
'tls_certificates': [],
},
'validation_context': {
'trust_chain_verification': 'ACCEPT_UNTRUSTED',
},
},
}
if verify_ca:
cluster['transport_socket']['typed_config']['validation_context'] = { # type: ignore
'trusted_ca': {'filename': f'/ssl-config/{proxy}-outgoing.pem'},
}
return cluster


if __name__ == '__main__':
Expand Down

0 comments on commit 8379a6a

Please sign in to comment.