Skip to content
This repository has been archived by the owner on Sep 30, 2024. It is now read-only.

Commit

Permalink
Merge pull request #29 from nevermined-io/feature/oauth2
Browse files Browse the repository at this point in the history
Added jwt support
  • Loading branch information
Rodolphe Marques authored Dec 4, 2020
2 parents 433aa3d + 5119ba8 commit 222868d
Show file tree
Hide file tree
Showing 9 changed files with 292 additions and 155 deletions.
33 changes: 24 additions & 9 deletions .github/workflows/pythonpackage.yml
Original file line number Diff line number Diff line change
Expand Up @@ -23,21 +23,35 @@ jobs:
uses: actions/setup-python@v1
with:
python-version: ${{ matrix.python-version }}
- name: Install dependencies

- name: Install minikube v1.12.0
run: |
wget https://storage.googleapis.com/minikube/releases/v1.12.0/minikube-linux-amd64
chmod +x minikube-linux-amd64
sudo mv minikube-linux-amd64 /usr/local/bin/minikube
- name: Reclaim some disk space
run : |
docker system prune --all --volumes -f
- name: Start Nevermined
run: |
docker login -u ${{ secrets.NEVERMINED_DOCKER_USERNAME }} -p ${{ secrets.NEVERMINED_DOCKER_TOKEN}}
git clone https://github.com/nevermined-io/tools nevermined-tools
cd nevermined-tools
rm -rf "${HOME}/.nevermined/nevermined-contracts/artifacts"
bash -x start_nevermined.sh --no-commons --no-faucet --latest 2>&1 > start_nevermined.log &
# start nevermined with the compute stack
./start_nevermined.sh --latest --no-commons --local-spree-node --events-handler --compute &
# wait for the compute api to be online.
# the compute api is the last service to come online
./scripts/wait_for_compute_api.sh
# extract contracts
cd ..
for i in $(seq 1 50); do
sleep 5
[ -f "${HOME}/.nevermined/nvermind-contracts/artifacts/ready" ] && break
done
ls -la "${HOME}/.nevermined/nevermined-contracts/artifacts/"
./scripts/wait_for_migration_and_extract_keeper_artifacts.sh
python -m pip install pip==20.2.4
- name: Install python package
run: |
pip install pip==20.2.4
pip install -r requirements_dev.txt
- name: Test with pytest
run: |
Expand All @@ -47,5 +61,6 @@ jobs:
export PARITY_ADDRESS1=0x068ed00cf0441e4829d9784fcbe7b9e26d4bd8d0
export PARITY_PASSWORD1=secret
export PARITY_KEYFILE1=tests/resources/data/key_file_1.json
pip install pytest
pytest -v
pytest -v
5 changes: 4 additions & 1 deletion nevermined_sdk_py/assets/asset_consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ class AssetConsumer:

@staticmethod
def access(service_agreement_id, service_index, ddo, consumer_account, destination,
gateway, secret_store, index=None):
gateway, secret_store, config, index=None):
"""
Download asset data files or result files from a compute job.
Expand All @@ -24,6 +24,7 @@ def access(service_agreement_id, service_index, ddo, consumer_account, destinati
:param destination: Path, str
:param gateway: Gateway instance
:param secret_store: SecretStore instance
:param config: Sdk configuration instance
:param index: Index of the document that is going to be downloaded, int
:return: Asset folder path, str
"""
Expand Down Expand Up @@ -55,6 +56,7 @@ def access(service_agreement_id, service_index, ddo, consumer_account, destinati
consume_url,
consumer_account,
asset_folder,
config,
index
)
else:
Expand All @@ -65,6 +67,7 @@ def access(service_agreement_id, service_index, ddo, consumer_account, destinati
consume_url,
consumer_account,
asset_folder,
config,
i
)

Expand Down
4 changes: 2 additions & 2 deletions nevermined_sdk_py/assets/asset_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ class AssetExecutor:
"""Class representing the call to the Gateway execute endpoint."""

@staticmethod
def execute(agreement_id, compute_ddo, workflow_ddo, consumer_account, gateway, index):
def execute(agreement_id, compute_ddo, workflow_ddo, consumer_account, gateway, index, config):
"""
:param agreement_id:
Expand All @@ -22,6 +22,6 @@ def execute(agreement_id, compute_ddo, workflow_ddo, consumer_account, gateway,
service_endpoint = ServiceAgreement.from_ddo(ServiceTypes.CLOUD_COMPUTE,
compute_ddo).service_endpoint
response = gateway.execute_compute_service(agreement_id, service_endpoint,
consumer_account, workflow_ddo)
consumer_account, workflow_ddo, config)

return response.json()["workflowId"]
173 changes: 89 additions & 84 deletions nevermined_sdk_py/gateway/gateway.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,12 @@
from common_utils_py.agreements.service_agreement import ServiceAgreement
from common_utils_py.exceptions import EncryptAssetUrlsError
from common_utils_py.http_requests.requests_session import get_requests_session
from common_utils_py.oauth2.token import (NeverminedJWTBearerGrant,
generate_access_grant_token,
generate_download_grant_token,
generate_execute_grant_token,
generate_compute_grant_token)
from contracts_lib_py.utils import add_ethereum_prefix_and_hash_msg

from nevermined_sdk_py.nevermined.keeper import NeverminedKeeper as Keeper

logger = logging.getLogger(__name__)
Expand All @@ -25,6 +29,11 @@ class Gateway:
"""
_http_client = get_requests_session()
_tokens_cache = {}

@staticmethod
def _generate_cache_key(*args):
return ''.join(args)

@staticmethod
def set_http_client(http_client):
Expand Down Expand Up @@ -83,62 +92,25 @@ def encrypt_files_dict(files_dict, encrypt_endpoint, asset_id, method):
return json.loads(response.text)['hash']

@staticmethod
def consume_service(did, service_agreement_id, service_endpoint, account, files,
destination_folder, index=None):
"""
Call the Gateway endpoint to get access to the different files that form the asset.
:param service_agreement_id: Service Agreement Id, str
:param service_endpoint: Url to access, str
:param account: Account instance of the consumer signing this agreement, hex-str
:param files: List containing the files to be consumed, list
:param index: Index of the document that is going to be downloaded, int
:param destination_folder: Path, str
:return: True if was downloaded, bool
"""
signature = Keeper.get_instance().sign_hash(
add_ethereum_prefix_and_hash_msg(service_agreement_id),
account)
headers = dict({
'X-Consumer-Address': account.address,
'X-Signature': signature,
'X-DID': did
})

if index is not None:
assert isinstance(index, int), logger.error('index has to be an integer.')
assert index >= 0, logger.error('index has to be 0 or a positive integer.')
assert index < len(files), logger.error(
'index can not be bigger than the number of files')
consume_url = Gateway._create_access_url(service_endpoint, service_agreement_id, index)
logger.info(f'invoke access endpoint with this url: {consume_url}')
response = Gateway._http_client.get(consume_url, headers=headers, stream=True)
file_name = Gateway._get_file_name(response)
Gateway.write_file(response, destination_folder, file_name)
def access_service(did, service_agreement_id, service_endpoint, account, destination_folder, config, index):
cache_key = Gateway._generate_cache_key(account.address, service_agreement_id, did)
if cache_key not in Gateway._tokens_cache:
grant_token = generate_access_grant_token(account, service_agreement_id, did)
access_token = Gateway.fetch_token(grant_token, config)
Gateway._tokens_cache[cache_key] = access_token
else:
for i, _file in enumerate(files):
consume_url = Gateway._create_access_url(service_endpoint, service_agreement_id, i)
logger.info(f'invoke access endpoint with this url: {consume_url}')
response = Gateway._http_client.get(consume_url, headers=headers, stream=True)
file_name = Gateway._get_file_name(response)
Gateway.write_file(response, destination_folder, file_name or f'file-{i}')
access_token = Gateway._tokens_cache[cache_key]

@staticmethod
def access_service(did, service_agreement_id, service_endpoint, account, destination_folder, index):
signature = Keeper.get_instance().sign_hash(
add_ethereum_prefix_and_hash_msg(service_agreement_id),
account)
headers = dict({
'X-Consumer-Address': account.address,
'X-Signature': signature,
'X-DID': did
})
consume_url = Gateway._create_access_url(service_endpoint, service_agreement_id, index)
headers = {"Authorization": f"Bearer {access_token}"}

response = Gateway._http_client.get(consume_url, headers=headers, stream=True)
if response.status_code != 200:
if not response.ok:
raise ValueError(response.text)

file_name = Gateway._get_file_name(response)
Gateway.write_file(response, destination_folder, file_name or f'file-{index}')

return response

@staticmethod
Expand All @@ -156,16 +128,19 @@ def compute_logs(service_agreement_id, execution_id, account, config):
:py:class:`requests.Response`: HTTP server response
"""
signature = Keeper.get_instance().sign_hash(
add_ethereum_prefix_and_hash_msg(execution_id),
account)
headers = {
'X-Consumer-Address': account.address,
'X-Signature': signature,
}
cache_key = Gateway._generate_cache_key(account.address, service_agreement_id, execution_id)
if cache_key not in Gateway._tokens_cache:
grant_token = generate_compute_grant_token(account, service_agreement_id, execution_id)
access_token = Gateway.fetch_token(grant_token, config)
Gateway._tokens_cache[cache_key] = access_token
else:
access_token = Gateway._tokens_cache[cache_key]

headers = {"Authorization": f"Bearer {access_token}"}
consume_url = Gateway._create_compute_logs_url(config, service_agreement_id, execution_id)

response = Gateway._http_client.get(consume_url, headers=headers)
if response.status_code != 200:
if not response.ok:
raise ValueError(response.text)
return response

Expand All @@ -184,14 +159,17 @@ def compute_status(service_agreement_id, execution_id, account, config):
:py:class:`requests.Response`: HTTP server response
"""
signature = Keeper.get_instance().sign_hash(
add_ethereum_prefix_and_hash_msg(execution_id),
account)
headers = {
'X-Consumer-Address': account.address,
'X-Signature': signature,
}
cache_key = Gateway._generate_cache_key(account.address, service_agreement_id, execution_id)
if cache_key not in Gateway._tokens_cache:
grant_token = generate_compute_grant_token(account, service_agreement_id, execution_id)
access_token = Gateway.fetch_token(grant_token, config)
Gateway._tokens_cache[cache_key] = access_token
else:
access_token = Gateway._tokens_cache[cache_key]

headers = {"Authorization": f"Bearer {access_token}"}
consume_url = Gateway._create_compute_status_url(config, service_agreement_id, execution_id)

response = Gateway._http_client.get(consume_url, headers=headers)
if response.status_code != 200:
raise ValueError(response.text)
Expand All @@ -213,19 +191,24 @@ def download(did, account, destination_folder, index, config):
:py:class:`requests.Response`: HTTP server response
"""
signature = Keeper.get_instance().sign_hash(
add_ethereum_prefix_and_hash_msg(did), account)
headers = {
'X-Consumer-Address': account.address,
'X-Signature': signature,
'X-DID': did
}
cache_key = Gateway._generate_cache_key(account.address, did)
if cache_key not in Gateway._tokens_cache:
grant_token = generate_download_grant_token(account, did)
access_token = Gateway.fetch_token(grant_token, config)
Gateway._tokens_cache[cache_key] = access_token
else:
access_token = Gateway._tokens_cache[cache_key]

headers = {"Authorization": f"Bearer {access_token}"}
consume_url = Gateway._create_download_url(config, index)

response = Gateway._http_client.get(consume_url, headers=headers, stream=True)
if response.status_code != 200:
if not response.ok:
raise ValueError(response.text)

file_name = Gateway._get_file_name(response)
Gateway.write_file(response, destination_folder, file_name or f'file-{index}')

return response

@staticmethod
Expand All @@ -247,21 +230,33 @@ def execute_service(service_agreement_id, service_endpoint, account, workflow_dd
return response

@staticmethod
def execute_compute_service(service_agreement_id, service_endpoint, account, workflow_ddo):
signature = Keeper.get_instance().sign_hash(
add_ethereum_prefix_and_hash_msg(service_agreement_id),
account)
headers = dict({
'X-Consumer-Address': account.address,
'X-Signature': signature,
'X-Workflow-DID': workflow_ddo.did
})
def execute_compute_service(service_agreement_id, service_endpoint, account, workflow_ddo, config):
cache_key = Gateway._generate_cache_key(account.address, service_agreement_id, workflow_ddo.did)
if cache_key not in Gateway._tokens_cache:
grant_token = generate_execute_grant_token(account, service_agreement_id, workflow_ddo.did)
access_token = Gateway.fetch_token(grant_token, config)
Gateway._tokens_cache[cache_key] = access_token
else:
access_token = Gateway._tokens_cache[cache_key]

headers = {"Authorization": f"Bearer {access_token}"}
execute_url = Gateway._create_compute_url(service_endpoint, service_agreement_id)
response = Gateway._http_client.post(execute_url, headers= headers)
if response.status_code != 200:

response = Gateway._http_client.post(execute_url, headers=headers)
if not response.ok:
raise ValueError(response.text)
return response

@staticmethod
def fetch_token(grant_token, config):
fetch_token_url = Gateway.get_fetch_token_endpoint(config)
response = Gateway._http_client.post(fetch_token_url, data={
"grant_type": NeverminedJWTBearerGrant.GRANT_TYPE,
"assertion": grant_token
})
if not response.ok:
raise ValueError(response.text)
return response.json()["access_token"]

@staticmethod
def _prepare_consume_payload(did, service_agreement_id, service_index, signature,
Expand Down Expand Up @@ -392,6 +387,16 @@ def get_publish_endpoint(config):
"""
return f'{Gateway.get_gateway_url(config)}/services/publish'

@staticmethod
def get_fetch_token_endpoint(config):
"""
Return the url to fetch an access token.
:param config: Config
:return: Url, str
"""
return f'{Gateway.get_gateway_url(config)}/services/oauth/token'

@staticmethod
def get_rsa_public_key(config):
return Gateway._http_client.get(config.gateway_url).json()['rsa-public-key']
Expand Down
4 changes: 3 additions & 1 deletion nevermined_sdk_py/nevermined/assets.py
Original file line number Diff line number Diff line change
Expand Up @@ -425,6 +425,7 @@ def access(self, service_agreement_id, did, service_index, consumer_account,
destination,
GatewayProvider.get_gateway(),
self._get_secret_store(consumer_account),
self._config,
index
)

Expand Down Expand Up @@ -597,7 +598,8 @@ def execute(self, agreement_id, did, index, consumer_account, workflow_did):
workflow_ddo,
consumer_account,
GatewayProvider.get_gateway(),
index
index,
self._config
)

@staticmethod
Expand Down
4 changes: 2 additions & 2 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,10 @@
'pyopenssl',
'PyJWT', # not jwt
'PyYAML==4.2b4',
'common-utils-py==0.3.0',
'common-utils-py==0.4.1',
'contracts-lib-py==0.5.2',
'nevermined-secret-store==0.1.0',
'requests==2.21.0',
'requests~=2.21.0',
'deprecated',
'pycryptodomex',
'tqdm',
Expand Down
Loading

0 comments on commit 222868d

Please sign in to comment.