Skip to content

Commit

Permalink
new unit tests
Browse files Browse the repository at this point in the history
  • Loading branch information
kowh-ai committed Jan 27, 2025
1 parent 42e6052 commit 56f840b
Showing 1 changed file with 58 additions and 17 deletions.
75 changes: 58 additions & 17 deletions ckanext/xloader/tests/test_jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,24 +91,65 @@ def test_xloader_data_into_datastore(self, cli, data):
resource = helpers.call_action("resource_show", id=data["metadata"]["resource_id"])
assert resource["datastore_contains_all_records_of_source_file"]

def test_external_url_download(self, cli, data):
# Modify the data to use an external URL
data['metadata']['original_url'] = 'https://example.com/test.csv'

# Create a mock response for the external URL
def mock_external_response(download_url, headers):
resp = Response()
resp.raw = io.BytesIO(_TEST_FILE_CONTENT.encode())
resp.headers = {'content-type': 'text/csv'}
return resp

self.enqueue(jobs.xloader_data_into_datastore, [data])
with mock.patch("ckanext.xloader.jobs.get_response", mock_external_response):
stdout = cli.invoke(ckan, ["jobs", "worker", "--burst"]).output
assert "Express Load completed" in stdout
def test_download_resource_data_with_ckanext_xloader_site_url(self, cli, data):
# Set the ckanext.xloader.site_url in the config
with mock.patch.dict(toolkit.config, {'ckanext.xloader.site_url': 'http://xloader-site-url'}):
data['metadata']['original_url'] = 'http://xloader-site-url/resource.csv'
self.enqueue(jobs.xloader_data_into_datastore, [data])
with mock.patch("ckanext.xloader.jobs.get_response", get_response):
stdout = cli.invoke(ckan, ["jobs", "worker", "--burst"]).output
assert "Express Load completed" in stdout

resource = helpers.call_action("resource_show", id=data["metadata"]["resource_id"])
assert resource["datastore_contains_all_records_of_source_file"]
resource = helpers.call_action("resource_show", id=data["metadata"]["resource_id"])
assert resource["datastore_contains_all_records_of_source_file"]

def test_download_resource_data_with_ckan_site_url(self, cli, data):
# Set the ckan.site_url in the config
with mock.patch.dict(toolkit.config, {'ckan.site_url': 'http://ckan-site-url'}):
data['metadata']['original_url'] = 'http://ckan-site-url/resource.csv'
self.enqueue(jobs.xloader_data_into_datastore, [data])
with mock.patch("ckanext.xloader.jobs.get_response", get_response):
stdout = cli.invoke(ckan, ["jobs", "worker", "--burst"]).output
assert "Express Load completed" in stdout

resource = helpers.call_action("resource_show", id=data["metadata"]["resource_id"])
assert resource["datastore_contains_all_records_of_source_file"]

def test_download_resource_data_with_different_original_url(self, cli, data):
# Set the ckan.site_url in the config
with mock.patch.dict(toolkit.config, {'ckan.site_url': 'http://ckan-site-url'}):
data['metadata']['original_url'] = 'http://external-site-url/resource.csv'
self.enqueue(jobs.xloader_data_into_datastore, [data])
with mock.patch("ckanext.xloader.jobs.get_response", get_response):
stdout = cli.invoke(ckan, ["jobs", "worker", "--burst"]).output
assert "Express Load completed" in stdout

resource = helpers.call_action("resource_show", id=data["metadata"]["resource_id"])
assert resource["datastore_contains_all_records_of_source_file"]

def test_callback_xloader_hook_with_ckanext_xloader_site_url(self, cli, data):
# Set the ckanext.xloader.site_url in the config
with mock.patch.dict(toolkit.config, {'ckanext.xloader.site_url': 'http://xloader-site-url'}):
data['result_url'] = 'http://xloader-site-url/api/3/action/xloader_hook'
self.enqueue(jobs.xloader_data_into_datastore, [data])
with mock.patch("ckanext.xloader.jobs.get_response", get_response):
stdout = cli.invoke(ckan, ["jobs", "worker", "--burst"]).output
assert "Express Load completed" in stdout

resource = helpers.call_action("resource_show", id=data["metadata"]["resource_id"])
assert resource["datastore_contains_all_records_of_source_file"]

def test_callback_xloader_hook_with_ckan_site_url(self, cli, data):
# Set the ckan.site_url in the config
with mock.patch.dict(toolkit.config, {'ckan.site_url': 'http://ckan-site-url'}):
data['result_url'] = 'http://ckan-site-url/api/3/action/xloader_hook'
self.enqueue(jobs.xloader_data_into_datastore, [data])
with mock.patch("ckanext.xloader.jobs.get_response", get_response):
stdout = cli.invoke(ckan, ["jobs", "worker", "--burst"]).output
assert "Express Load completed" in stdout

resource = helpers.call_action("resource_show", id=data["metadata"]["resource_id"])
assert resource["datastore_contains_all_records_of_source_file"]

def test_xloader_ignore_hash(self, cli, data):
self.enqueue(jobs.xloader_data_into_datastore, [data])
Expand Down

0 comments on commit 56f840b

Please sign in to comment.