Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

S3 connector tests #7

Open
wants to merge 63 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
63 commits
Select commit Hold shift + click to select a range
6456c12
feat: adding azure-connector
vinayak-sanketika May 10, 2024
ec218c5
Initial Commit
saimanaswini-k May 10, 2024
0274f0f
initial
saimanaswini-k May 10, 2024
0ef519d
Update gcs.py
saimanaswini-k May 10, 2024
3cfeba4
Update gcs.py
saimanaswini-k May 10, 2024
72fb940
Update gcs.py
saimanaswini-k May 14, 2024
4de96f0
feat: added the read form parquet format
vinayak-sanketika May 14, 2024
a11e1b0
feat: structuring the azure code
vinayak-sanketika May 14, 2024
2c04400
Update gcs.py
saimanaswini-k May 14, 2024
3807762
Update gcs.py
saimanaswini-k May 14, 2024
97fdcd3
Update connector.py
saimanaswini-k May 15, 2024
fd8ef6b
Update object_info.py
saimanaswini-k May 15, 2024
add758d
feat: updating the fetch and update tags
vinayak-sanketika May 15, 2024
471d698
Merge remote-tracking branch 'origin/main' into azure-connector
vinayak-sanketika May 15, 2024
b42dc6d
obsrv-labs/issue-tracker#6: feat: changes made in config.yaml and add…
vinayak-sanketika May 15, 2024
f8e790b
Merge branch 'main' of https://github.com/obsrv-labs/object-store-con…
saimanaswini-k May 15, 2024
3f1aee5
Merge branch 'gcs-connector' of https://github.com/obsrv-labs/object-…
saimanaswini-k May 15, 2024
1ec0f98
obsrv-labs/issue-tracker#2: gcs connector enhancements
saimanaswini-k May 16, 2024
69771d4
obsrv-labs/issue-tracker#6: lint and cleanup
vinayak-sanketika May 16, 2024
b4d2277
obsrv-labs/issue-tracker#6: fix: Changes made in exception and update…
vinayak-sanketika May 17, 2024
ccefd53
obsrv-labs/issue-tracker#2: gcs connector enhancements
saimanaswini-k May 17, 2024
1d171cd
obsrv-labs/issue-tracker#6: feat: lint and cleanup
vinayak-sanketika May 17, 2024
562f004
obsrv-labs/issue-tracker#6: lint and cleanup
vinayak-sanketika May 17, 2024
deae8e3
obsrv-labs/issue-tracker#2: Implemented Pagination Functionality
saimanaswini-k May 17, 2024
5a25be0
obsrv-labs/issue-tracker#6: feat: increased the results_per_page to 1…
vinayak-sanketika May 17, 2024
3ed0a50
Update object_info.py
saimanaswini-k May 17, 2024
b535dab
obsrv-labs/issue-tracker#2: Code Cleanup
saimanaswini-k May 20, 2024
4787e66
obsrv-labs/issue-tracker#2: Added GCS Dependencies
saimanaswini-k May 20, 2024
ef4cde9
obsrv-labs/issue-tracker#2: Cleanup
saimanaswini-k May 20, 2024
d6b8e76
obsrv-labs/issue-tracker#2: Added GCS Dependency
saimanaswini-k May 20, 2024
8a80e44
Delete
saimanaswini-k May 20, 2024
d58df5b
obsrv-labs/issue-tracker#2: Cleanup
saimanaswini-k May 20, 2024
9a22f4f
obsrv-labs/issue-tracker#2: Jar File
saimanaswini-k May 20, 2024
95af58f
Delete
saimanaswini-k May 20, 2024
6fe3248
obsrv-labs/issue-tracker#2: Model for gcs
saimanaswini-k May 20, 2024
6952d73
obsrv-labs/issue-tracker#2: Cleanup
saimanaswini-k May 20, 2024
d72f7c7
obsrv-labs/issue-tracker#2: Model for gcs
saimanaswini-k May 20, 2024
f5622f7
obsrv-labs/issue-tracker#2: gcs connector
saimanaswini-k May 20, 2024
20149ac
obsrv-labs/issue-tracker#6: fix: updated the config, poetry depende…
vinayak-sanketika May 21, 2024
2ba2530
obsrv-labs/issue-tracker#6: feat: added the test folder
vinayak-sanketika May 22, 2024
944af8c
obsrv-labs/issue-tracker#2: Added tests folder
saimanaswini-k May 27, 2024
0797f2e
obsrv-labs/issue-tracker#2: Added tests folder
saimanaswini-k May 27, 2024
add55cf
obsrv-labs/issue-tracker#2: Added tests folder
saimanaswini-k May 27, 2024
189820b
obsrv-labs/issue-tracker#2: Added tests folder
saimanaswini-k May 27, 2024
e4e71e4
obsrv-labs/issue-tracker#2: Added Sample Data
saimanaswini-k May 27, 2024
2eb866d
obsrv-labs/issue-tracker#2 : Added Sample Data
saimanaswini-k May 27, 2024
7e69c0a
obsrv-labs/issue-tracker#2: Updated test case
saimanaswini-k May 27, 2024
13282a7
obsrv-labs/issue-tracker#6: feat: added test case for azure-connector
vinayak-sanketika May 27, 2024
cac0c0c
obsrv-labs/issue-tracker#2: Updated test case
saimanaswini-k May 27, 2024
cc0c85d
obsrv-labs/issue-tracker#6: cleanup
vinayak-sanketika May 27, 2024
d8c45ce
obsrv-labs/issue-tracker#6: fix: removed the with_bind_ports()
vinayak-sanketika May 27, 2024
bf86a6f
obsrv-labs/issue-tracker#6: cleanup
vinayak-sanketika May 27, 2024
07901d5
obsrv-labs/issue-tracker#6: feat: added the condition to check if the…
vinayak-sanketika May 28, 2024
251e9da
obsrv-labs/issue-tracker#6: Cleanup
vinayak-sanketika Jun 2, 2024
78e5084
obsrv-labs/issue-tracker#6 feat: added the assert for num_api_calls,…
vinayak-sanketika Jun 2, 2024
f071daa
obsrv-labs/issue-tracker#6 cleanup
vinayak-sanketika Jun 3, 2024
9b1e642
obsrv-labs/issue-tracker#6: cleanup
vinayak-sanketika Jun 3, 2024
0231afe
obsrv-labs/issue-tracker#6: cleanup
vinayak-sanketika Jun 3, 2024
710f18f
obsrv-labs/issue-tracker#6: cleanup
vinayak-sanketika Jun 4, 2024
e90e903
obsrv-labs/issue-tracker#6: cleanup
vinayak-sanketika Jun 4, 2024
c22d2d1
obsrv-labs/issue-tracker#6: Merge branch 'azure-connector-tests' into…
vinayak-sanketika Jun 4, 2024
6c0dbb9
obsrv-labs/issue-tracker#6: feat: checked with azure cred and cleanup
vinayak-sanketika Jun 7, 2024
74f73bb
obsrv-labs/issue-tracker#6: feat :cleanup
vinayak-sanketika Jun 7, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 5 additions & 4 deletions object_store_connector/config/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -6,16 +6,17 @@ postgres:
port: 5432

kafka:
bootstrap-servers: localhost:9092
broker-servers: localhost:9092
telemetry-topic: obsrv-connectors-telemetry
connector-metrics-topic: obsrv-connectors-metrics
producer:
compression: snappy
max-request-size: 1000000 # 1MB {1M: 1000000, 10M: 10000000, 5M: 5000000}

obsrv_encryption_key: random_thirty_two_encrypt_string
obsrv_encryption_key: strong_encryption_key_to_encrypt

connector_instance_id: s3.new-york-taxi-data.1
connector_instance_id: gcs.new-york-taxi-data.1
connector_instance_id: azure.new-york-taxi-data.1

building-block: rv-test
env: local
env: local
10 changes: 9 additions & 1 deletion object_store_connector/connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,10 @@
from pyspark.conf import SparkConf
from pyspark.sql import DataFrame, SparkSession
from pyspark.sql.functions import lit
from provider.gcs import GCS

from provider.azure import AzureBlobStorage
from models.object_info import ObjectInfo
logger = LoggerController(__name__)

MAX_RETRY_COUNT = 10
Expand Down Expand Up @@ -71,6 +74,11 @@ def get_spark_conf(self, connector_config) -> SparkConf:
def _get_provider(self, connector_config: Dict[Any, Any]):
if connector_config["source"]["type"] == "s3":
self.provider = S3(connector_config)
elif connector_config["source"]["type"] == "azure_blob":
self.provider = AzureBlobStorage(connector_config)

elif connector_config["source"]["type"] == "gcs":
self.provider = GCS(connector_config)
else:
ObsrvException(
ErrorData(
Expand All @@ -93,7 +101,6 @@ def _get_objects_to_process(
"INVALID_CONTEXT", "building_block or env not found in context"
)
)

if not len(objects):
num_files_discovered = ctx.stats.get_stat("num_files_discovered", 0)
objects = self.provider.fetch_objects(ctx, metrics_collector)
Expand Down Expand Up @@ -177,4 +184,5 @@ def _exclude_processed_objects(self, ctx: ConnectorContext, objects):
if not any(tag["key"] == self.dedupe_tag for tag in obj.get("tags")):
to_be_processed.append(obj)

# return [objects[-1]] #TODO: Remove this line
return to_be_processed
12 changes: 10 additions & 2 deletions object_store_connector/models/object_info.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,20 @@ class Tag:
value: str

def to_dict(self):
return {"key": self.key, "value": self.value}

return {
"key": self.key,
"value": self.value}

def to_aws(self):
return {"Key": self.key, "Value": self.value}

def to_gcs(self):
return {"Key": self.key, "Value": self.value}





@dataclass
class ObjectInfo:
id: str = field(default_factory=lambda: str(uuid4()))
Expand Down
262 changes: 262 additions & 0 deletions object_store_connector/provider/azure.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,262 @@
from azure.core.exceptions import AzureError
from typing import Dict, List, Any
from azure.storage.blob import ContainerClient, BlobClient
from pyspark.sql import DataFrame, SparkSession
from provider.blob_provider import BlobProvider
from models.object_info import ObjectInfo,Tag
from pyspark.conf import SparkConf
from obsrv.job.batch import get_base_conf
from obsrv.connector import ConnectorContext, MetricsCollector
from obsrv.common import ObsrvException
from obsrv.models import ErrorData





class AzureBlobStorage(BlobProvider):
def __init__(self, connector_config: str)-> None:
super().__init__()
self.connector_config=connector_config
self.account_name = connector_config["source"]["credentials"]["account_name"]
self.account_key = connector_config["source"]["credentials"]["account_key"]
self.container_name = connector_config["source"]["containername"]
self.blob_endpoint = connector_config["source"]["blob_endpoint"]
self.prefix = (
connector_config["source"]["prefix"]
if "prefix" in connector_config["source"]
else "/"
)

if self.blob_endpoint=="core.windows.net":
self.connection_string = f"DefaultEndpointsProtocol=https;AccountName={self.account_name};AccountKey={self.account_key};EndpointSuffix={self.blob_endpoint}"
else:
self.connection_string = f"DefaultEndpointsProtocol=https;AccountName={self.account_name};AccountKey={self.account_key};BlobEndpoint={self.blob_endpoint}"

self.container_client = ContainerClient.from_connection_string(self.connection_string,self.container_name)




def get_spark_config(self, connector_config) -> SparkConf:
conf = get_base_conf()
conf.setAppName("ObsrvObjectStoreConnector")
conf.set("spark.jars.packages", "org.apache.hadoop:hadoop-azure:3.3.1")
conf.set("fs.azure.storage.accountAuthType", "SharedKey")
conf.set("fs.azure", "org.apache.hadoop.fs.azure.NativeAzureFileSystem")
conf.set(f"fs.azure.account.key.{self.account_name}.blob.core.windows.net", self.account_key)
conf.set("fs.azure.storage.accountKey", connector_config["source"]["credentials"]["account_key"])
return conf


def fetch_objects(self,ctx: ConnectorContext, metrics_collector: MetricsCollector) -> List[ObjectInfo]:

objects = self._list_blobs_in_container(ctx,metrics_collector=metrics_collector)

objects_info=[]
if objects==None:
raise Exception("No objects found")

for obj in objects:

if self.blob_endpoint==("core.windows.net"):
blob_location=f"wasbs://{self.container_name}@{self.account_name}.blob.core.windows.net/{obj['name']}"

else:
blob_location = f"wasb://{self.container_name}@storageemulator/{obj['name']}"

object_info = ObjectInfo(
location=blob_location,
format=obj["name"].split(".")[-1],
file_size_kb=obj["size"] // 1024,
file_hash=obj["etag"].strip('"'),
tags= self.fetch_tags(obj['name'], metrics_collector)
)
objects_info.append(object_info.to_json())

return objects_info


def read_object(self, object_path: str, sc: SparkSession, metrics_collector: MetricsCollector,file_format: str) -> DataFrame:
labels = [
{"key": "request_method", "value": "GET"},
{"key": "method_name", "value": "getObject"},
{"key": "object_path", "value": object_path}
]


api_calls, errors, records_count = 0, 0, 0

try:
if file_format == "jsonl":
df = sc.read.format("json").load(object_path)
elif file_format == "json":
df = sc.read.format("json").option("multiLine", False).load(object_path)
elif file_format == "csv":
df = sc.read.format("csv").option("header", True).load(object_path)
elif file_format == "parquet":
df = sc.read.parquet(object_path)

else:
raise ObsrvException(ErrorData("UNSUPPORTED_FILE_FORMAT", f"unsupported file format: {file_format}"))
records_count = df.count()
api_calls += 1

metrics_collector.collect({"num_api_calls": api_calls, "num_records": records_count}, addn_labels=labels)
return df
except AzureError as exception:
errors += 1
labels += [
{"key": "error_code", "value": str(exception.exc_msg)}
]
metrics_collector.collect("num_errors", errors, addn_labels=labels)
ObsrvException(
ErrorData(
"AzureBlobStorage_READ_ERROR", f"failed to read object from AzureBlobStorage: {str(exception)}"
)
)
return None
except Exception as exception:
errors += 1
labels += [{"key": "error_code", "value": "AzureBlobStorage_READ_ERROR"}]
metrics_collector.collect("num_errors", errors, addn_labels=labels)
ObsrvException(
ErrorData(
"AzureBlobStorage_READ_ERROR", f"failed to read object from AzureBlobStorage: {str(exception)}"
)
)
return None


def _list_blobs_in_container(self,ctx: ConnectorContext, metrics_collector) -> list:
self.container_name = self.connector_config['source']['containername']

summaries = []
continuation_token = None
file_formats = {
"json": ["json", "json.gz", "json.zip"],
"jsonl": ["json", "json.gz", "json.zip"],
"csv": ["csv", "csv.gz", "csv.zip"],
}
file_format = ctx.data_format
# metrics
api_calls, errors = 0, 0

labels = [
{"key": "request_method", "value": "GET"},
{"key": "method_name", "value": "list_blobs"},
]

# container_client = ContainerClient.from_connection_string(conn_str=self.connection_string, container_name=container_name)
while True:
try:
if continuation_token:
blobs = self.container_client.list_blobs(results_per_page=1000).by_page(continuation_token=continuation_token)
else:
blobs= self.container_client.list_blobs()
api_calls += 1

for blob in blobs:
if any(blob["name"].endswith(f) for f in file_formats[file_format]):
summaries.append(blob)

if not continuation_token:
break
continuation_token = blobs.continuation_token
except AzureError as exception:
errors += 1
labels += [
{
"key": "error_code",
"value": str(exception.exc_msg),
}
]
metrics_collector.collect("num_errors", errors, addn_labels=labels)
ObsrvException(
ErrorData(
"AZURE_BLOB_LIST_ERROR",
f"failed to list objects in AzureBlobStorage: {str(exception)}",
)
)

metrics_collector.collect("num_api_calls", api_calls, addn_labels=labels)
return summaries


def _get_spark_session(self):
return SparkSession.builder.config(conf=self.get_spark_config()).getOrCreate()


def fetch_tags(self, object_path: str, metrics_collector: MetricsCollector) -> List[Tag]:

labels = [
{"key": "request_method", "value": "GET"},
{"key": "method_name", "value": "get_blob_tags"},
{"key": "object_path", "value": object_path}
]
api_calls, errors = 0, 0
try:
blob_client = BlobClient.from_connection_string(
conn_str=self.connection_string, container_name=self.container_name, blob_name=object_path
)
tags = blob_client.get_blob_tags()

api_calls += 1
metrics_collector.collect("num_api_calls", api_calls, addn_labels=labels)

return [Tag(key,value) for key,value in tags.items()]


except AzureError as exception:
errors += 1
labels += [
{"key": "error_code", "value": str(exception.exc_msg)}
]
metrics_collector.collect("num_errors", errors, addn_labels=labels)
ObsrvException(
ErrorData(
"AzureBlobStorage_TAG_READ_ERROR",
f"failed to fetch tags from AzureBlobStorage: {str(exception)}",
)
)


def update_tag(self,object: ObjectInfo, tags: list, metrics_collector: MetricsCollector) -> bool:
labels = [
{"key": "request_method", "value": "PUT"},
{"key": "method_name", "value": "set_blob_tags"},
{"key": "object_path", "value": object.get('location')}
]
api_calls, errors = 0, 0
try:

new_dict = {tag['key']: tag['value'] for tag in tags}
location = object.get("location")
obj = location.split("/")[-1]

blob_client = BlobClient.from_connection_string(
conn_str=self.connection_string, container_name=self.container_name, blob_name=obj
)
existing_tags = blob_client.get_blob_tags() or {}
existing_tags.update(new_dict)

blob_client.set_blob_tags(existing_tags)
api_calls += 1
metrics_collector.collect("num_api_calls", api_calls, addn_labels=labels)

return True
except AzureError as exception:
errors += 1
labels += [
{"key": "error_code", "value": str(exception.exc_msg)}
]
metrics_collector.collect("num_errors", errors, addn_labels=labels)
ObsrvException(
ErrorData(
"AzureBlobStorage_TAG_UPDATE_ERROR",
f"failed to update tags in AzureBlobStorage for object: {str(exception)}",
)
)
return False


Loading