From dcf7a410990d3bfa7563cb0fe24289d99b27c94e Mon Sep 17 00:00:00 2001 From: Claudia Sun Date: Sat, 23 Sep 2023 22:35:27 -0700 Subject: [PATCH 1/5] upgrade fabric k8s library --- bpg-config.yaml | 118 ++++++++ pom.xml | 2 +- .../com/apple/spark/api/SubmissionStatus.java | 4 +- .../apple/spark/api/SubmissionSummary.java | 6 +- .../apple/spark/core/ApplicationMonitor.java | 35 +-- .../spark/core/ApplicationUpdateEvent.java | 14 +- .../apple/spark/core/KubernetesHelper.java | 8 +- .../core/RestSubmissionsStreamingOutput.java | 6 +- .../spark/core/RunningApplicationMonitor.java | 40 ++- .../core/SparkApplicationResourceHelper.java | 4 +- ...ionResource.java => SparkApplication.java} | 6 +- .../SparkApplicationResourceDoneable.java | 33 --- .../SparkApplicationResourceList.java | 2 +- .../spark/rest/ApplicationGetLogRest.java | 6 +- .../spark/rest/ApplicationSubmissionRest.java | 119 ++++---- .../java/com/apple/spark/rest/RestBase.java | 68 +++-- .../java/com/apple/spark/util/HttpUtils.java | 263 ++++++++---------- .../core/RunningApplicationMonitorTest.java | 8 +- 18 files changed, 378 insertions(+), 364 deletions(-) create mode 100644 bpg-config.yaml rename src/main/java/com/apple/spark/operator/{SparkApplicationResource.java => SparkApplication.java} (86%) delete mode 100644 src/main/java/com/apple/spark/operator/SparkApplicationResourceDoneable.java diff --git a/bpg-config.yaml b/bpg-config.yaml new file mode 100644 index 0000000..dff8962 --- /dev/null +++ b/bpg-config.yaml @@ -0,0 +1,118 @@ +defaultSparkConf: + spark.kubernetes.submission.connectionTimeout: 30000 + spark.kubernetes.submission.requestTimeout: 30000 + spark.kubernetes.driver.connectionTimeout: 30000 + spark.kubernetes.driver.requestTimeout: 30000 +sparkClusters: + - weight: 100 + id: minikube + eksCluster: arn:aws:eks:us-west-2:168594260668:cluster/og60-skate-eks-02 + masterUrl: https://7EB14B8493566A806F42E3AC481303AA.gr7.us-west-2.eks.amazonaws.com + caCertDataSOPS: LS0tLS1CRUdJTiBDRVJUSUZJQ0FURS0tLS0tCk1JSUM1ekNDQWMrZ0F3SUJBZ0lCQURBTkJna3Foa2lHOXcwQkFRc0ZBREFWTVJNd0VRWURWUVFERXdwcmRXSmwKY201bGRHVnpNQjRYRFRJeU1EUXhOVEl5TVRNeU1Gb1hEVE15TURReE1qSXlNVE15TUZvd0ZURVRNQkVHQTFVRQpBeE1LYTNWaVpYSnVaWFJsY3pDQ0FTSXdEUVlKS29aSWh2Y05BUUVCQlFBRGdnRVBBRENDQVFvQ2dnRUJBTU5oCnpaZGpzazNYbFZtUDc3dlhXVE1xZlNHb3ZPTExTRjU3QUxnVFFGR2lOb3R6WjZvb3BEOStCeW5WWWhoRFpCdlkKV1NuRUN6dFVGSUZTd014STA3bnI2dERPYkk0cGQ1aUNjZzdUV2YzL1ZYMHNQTVNrbGpMUVNpQi9pSHdGSkNlWQpDQVo2cWtvZ29kbFErU0NzL29jYW9XSklBWUI4UmxkZzhQd0FpQ09hNGR5aGF1UlYvdng1WWovS05ucnJqaEZXCkZZZHdvMkRLM0U5OFUwa2VpUTc5QjJvK1pRNEhYZVNYRHczdi96enBLOVQ3ZmJzV2t5NXhYa1EvQU9aMHR3aFcKcmdQQXY3RWhPWTZzeDh1SWY3ZGVtQ1Z0T3lVMUsrYVJkclViekkyd2RpK1E5eEM3NmlDL1ZUbDdwK2I0UzI2UgpTbkc1bi9OWDJoVVdDa0piUXFFQ0F3RUFBYU5DTUVBd0RnWURWUjBQQVFIL0JBUURBZ0trTUE4R0ExVWRFd0VCCi93UUZNQU1CQWY4d0hRWURWUjBPQkJZRUZDbGZmZ1BKRERrcllKdlJOVFptUjRaaysxZU5NQTBHQ1NxR1NJYjMKRFFFQkN3VUFBNElCQVFDS3cwMzhRd1RXYlBvS2ZKZHExRFVuVENXZW1LalVkYlJFZEJyMWdSQ09IZGhyZHZ5UQphSGVkMUcyZDYrbHROeHQ1S1pYU0Q3N1FaRGM3Rk5BWFkrTTFnZGFLT01xaEJUZWkwd0l2VGoxYUdtZWhJRXd3Cm9NK0NmSmloRXFMVS9USGxOVG1zaVIxRG1vMHBkZzNzQWMza3pGZ0hPMEVvUk0xYWlDNUV6SzRqeU1QeXhVYUQKaGNQWkx1ZVE2L3hMckpHRmlCN1Jxai9jT0FyM2ticGJCVUFxYXY1N0V3UEVxbnJHOGQ1Lzd3V25uelk5MkloNgpIRDlwVWJLemF2MFhYVVJsRUZxVXVEbElXZDFCWkdNS1FMTjF0NGtkYy9MWEI5RjlqU055NklaVXQzTjJCTklRCm9pOVpYbHE0RFlLeFdFRUZjVUIrZ2FVK3QrcTVEaERQM04wbwotLS0tLUVORCBDRVJUSUZJQ0FURS0tLS0tCg== + userTokenSOPS: ZXlKaGJHY2lPaUpTVXpJMU5pSXNJbXRwWkNJNklqbHpOSEJxTFROTlVqZFFha3R5ZURGVFVDMUdSRmxaWlRSaGVqbDRNbnAxYkZWV2NIUlZRVVpDWHpnaWZRLmV5SnBjM01pT2lKcmRXSmxjbTVsZEdWekwzTmxjblpwWTJWaFkyTnZkVzUwSWl3aWEzVmlaWEp1WlhSbGN5NXBieTl6WlhKMmFXTmxZV05qYjNWdWRDOXVZVzFsYzNCaFkyVWlPaUp6Y0dGeWF5MWhjSEJzYVdOaGRHbHZibk1pTENKcmRXSmxjbTVsZEdWekxtbHZMM05sY25acFkyVmhZMk52ZFc1MEwzTmxZM0psZEM1dVlXMWxJam9pYkc5allXd3RjM0JoY21zdGIzQmxjbUYwYjNJdGMzQmhjbXN0ZEc5clpXNHRiV3h3Y1cwaUxDSnJkV0psY201bGRHVnpMbWx2TDNObGNuWnBZMlZoWTJOdmRXNTBMM05sY25acFkyVXRZV05qYjNWdWRDNXVZVzFsSWpvaWJHOWpZV3d0YzNCaGNtc3RiM0JsY21GMGIzSXRjM0JoY21zaUxDSnJkV0psY201bGRHVnpMbWx2TDNObGNuWnBZMlZoWTJOdmRXNTBMM05sY25acFkyVXRZV05qYjNWdWRDNTFhV1FpT2lJek1qY3dPRGxqWWkwMU1XSm1MVFF3T0dVdE9XTTBaUzFsT0RjMllXSTRZamhpTWpVaUxDSnpkV0lpT2lKemVYTjBaVzA2YzJWeWRtbGpaV0ZqWTI5MWJuUTZjM0JoY21zdFlYQndiR2xqWVhScGIyNXpPbXh2WTJGc0xYTndZWEpyTFc5d1pYSmhkRzl5TFhOd1lYSnJJbjAuTDNCcTk2X3c2dlVQOVVYNnVzQmtvVFpzSVVJX0ZHbm9URFdxOHFRWVFPcnBXUkxqSkUzV0N5ZGxpS1QweWdZbmdfa1FhZ3J1VG1Mb2ZDeUc1dXBrMGd0UEllQi1Nel9aUlhXaldYdkRvME9yb0E0WFhzd1VsMnpSSkx2TEwzRlkwMWJWSXZVd0ZiSmpSNUhKdUtHa2o2U1haQ2ZxdVZRNHJPaDhqUWdJZkk4V2hMRGJlS2F5ZU16Rm5XS1lpdlpocllaeElQMzcteHJUY0V0NW9tZTVYR2hzZXh3UGhJRUdhY2E4NW1MbmV1czU5ZmhYdUhyNERrc3RURDFLYWhZaTBJMnFsdWtYdmtvbVdNRjVidDRwWTV5MkpVV3BXbTB2cHhOc3lPRDRUN01FZ2ZETkhwRDhmYUdVXzRzZlNLTkhXQkpaOEU0cG9PV1FZYko2WU9tM1V3 + userName: spark-operator + sparkApplicationNamespace: spark-applications + sparkServiceAccount: local-spark-operator-spark + sparkVersions: + - "3.2" + queues: + - poc + ttlSeconds: 259200 + sparkUIUrl: http://localhost:8080 + batchScheduler: yunikorn + sparkConf: + spark.kubernetes.executor.podNamePrefix: '{spark-application-resource-name}' + spark.eventLog.enabled: "true" + spark.eventLog.dir: s3a://bpg/eventlog + spark.history.fs.logDirectory: s3a://bpg/eventlog + spark.sql.warehouse.dir: s3a://bpg/warehouse + spark.sql.catalogImplementation: hive + spark.jars.ivy: /opt/spark/work-dir/.ivy2 + spark.hadoop.fs.s3a.connection.ssl.enabled: false + spark.hadoop.fs.s3a.access.key: claudia_sun + spark.hadoop.fs.s3a.secret.key: ad4ae70b65b67ad6a47026b9f128215a0200f7a3bd371d2e9a10f5a482397569 + spark.hadoop.fs.s3a.endpoint: 10.100.223.0:9878 + spark.hadoop.fs.s3a.impl: org.apache.hadoop.fs.s3a.S3AFileSystem + spark.hadoop.fs.s3a.change.detection.version.required: false + spark.hadoop.fs.s3a.change.detection.mode: none + spark.hadoop.fs.s3a.fast.upload: true + spark.jars.packages: org.apache.hadoop:hadoop-aws:3.2.2 + spark.hadoop.fs.s3a.aws.credentials.provider: org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider + sparkUIOptions: + ServicePort: 4040 + ingressAnnotations: + nginx.ingress.kubernetes.io/rewrite-target: /$2 + nginx.ingress.kubernetes.io/proxy-redirect-from: http://$host/ + nginx.ingress.kubernetes.io/proxy-redirect-to: /spark-applications-4/{spark-application-resource-name}/ + kubernetes.io/ingress.class: nginx + nginx.ingress.kubernetes.io/configuration-snippet: | + proxy_set_header Accept-Encoding ""; # disable compression + sub_filter_last_modified off; + sub_filter '' ' '; # add base url + sub_filter 'href="/' 'href="'; # remove absolute URL path so base url applies + sub_filter 'src="/' 'src="'; # remove absolute URL path so base url applies + + sub_filter '/{{num}}/jobs/' '/jobs/'; + + sub_filter "setUIRoot('')" "setUIRoot('/spark-applications-4/{spark-application-resource-name}/')"; # Set UI root for JS scripts + sub_filter "document.baseURI.split" "document.documentURI.split"; # Executors page issue fix + sub_filter_once off; + ingressTLS: + - hosts: + - localhost + secretName: localhost-tls-secret + driver: + env: + - name: STATSD_SERVER_IP + valueFrom: + fieldRef: + fieldPath: status.hostIP + - name: STATSD_SERVER_PORT + value: "8125" + executor: + env: + - name: STATSD_SERVER_IP + valueFrom: + fieldRef: + fieldPath: status.hostIP + - name: STATSD_SERVER_PORT + value: "8125" +sparkImages: + - name: apache/spark-py:v3.2.2 + types: + - Python + version: "3.2" + - name: apache/spark:v3.2.2 + types: + - Java + - Scala + version: "3.2" +s3Bucket: bpg +s3Folder: uploaded +sparkLogS3Bucket: bpg +sparkLogIndex: index/index.txt +batchFileLimit: 2016 +sparkHistoryDns: localhost +gatewayDns: localhost +sparkHistoryUrl: http://localhost:8088 +allowedUsers: + - '*' +blockedUsers: [] +queues: + - name: poc + maxRunningMillis: 21600000 +queueTokenSOPS: {} +dbStorageSOPS: + connectionString: jdbc:postgresql://localhost:5432/bpg?useUnicode=yes&characterEncoding=UTF-8&useLegacyDatetimeCode=false&connectTimeout=10000&socketTimeout=30000 + user: bpg + password: samplepass +statusCacheExpireMillis: 9000 +server: + applicationConnectors: + - type: http + port: 8080 +logging: + level: INFO + loggers: + com.apple.spark: INFO +sops: {} diff --git a/pom.xml b/pom.xml index 5ca9526..f36ab49 100644 --- a/pom.xml +++ b/pom.xml @@ -102,7 +102,7 @@ io.fabric8 kubernetes-client - 4.13.3 + 6.5.0 io.micrometer diff --git a/src/main/java/com/apple/spark/api/SubmissionStatus.java b/src/main/java/com/apple/spark/api/SubmissionStatus.java index ea78594..ef29f70 100644 --- a/src/main/java/com/apple/spark/api/SubmissionStatus.java +++ b/src/main/java/com/apple/spark/api/SubmissionStatus.java @@ -23,7 +23,7 @@ import static com.apple.spark.core.SparkConstants.SUBMITTED_STATE; import com.apple.spark.core.Constants; -import com.apple.spark.operator.SparkApplicationResource; +import com.apple.spark.operator.SparkApplication; import com.apple.spark.operator.SparkApplicationStatus; import com.apple.spark.util.DateTimeUtils; import com.fasterxml.jackson.annotation.JsonInclude; @@ -39,7 +39,7 @@ public class SubmissionStatus { private String applicationState; private String applicationErrorMessage; - public void copyFrom(SparkApplicationResource sparkApplicationResource) { + public void copyFrom(SparkApplication sparkApplicationResource) { Long creationTime = DateTimeUtils.parseOrNull(sparkApplicationResource.getMetadata().getCreationTimestamp()); this.setCreationTime(creationTime); diff --git a/src/main/java/com/apple/spark/api/SubmissionSummary.java b/src/main/java/com/apple/spark/api/SubmissionSummary.java index 1981922..89d8a3f 100644 --- a/src/main/java/com/apple/spark/api/SubmissionSummary.java +++ b/src/main/java/com/apple/spark/api/SubmissionSummary.java @@ -24,7 +24,7 @@ import com.apple.spark.AppConfig; import com.apple.spark.core.Constants; import com.apple.spark.core.SparkConstants; -import com.apple.spark.operator.SparkApplicationResource; +import com.apple.spark.operator.SparkApplication; import com.apple.spark.operator.SparkApplicationSpec; import com.apple.spark.util.ConfigUtil; import com.fasterxml.jackson.annotation.JsonInclude; @@ -86,7 +86,7 @@ public void setApplicationName(String applicationName) { } public void copyFrom( - SparkApplicationResource sparkApplicationResource, + SparkApplication sparkApplicationResource, AppConfig.SparkCluster sparkCluster, AppConfig appConfig) { this.copyFrom(sparkApplicationResource); @@ -103,7 +103,7 @@ public void copyFrom( } @Override - public void copyFrom(SparkApplicationResource sparkApplicationResource) { + public void copyFrom(SparkApplication sparkApplicationResource) { super.copyFrom(sparkApplicationResource); setSubmissionId(sparkApplicationResource.getMetadata().getName()); if (sparkApplicationResource.getMetadata().getLabels() != null) { diff --git a/src/main/java/com/apple/spark/core/ApplicationMonitor.java b/src/main/java/com/apple/spark/core/ApplicationMonitor.java index 2297500..b707334 100644 --- a/src/main/java/com/apple/spark/core/ApplicationMonitor.java +++ b/src/main/java/com/apple/spark/core/ApplicationMonitor.java @@ -24,18 +24,14 @@ import com.apple.spark.AppConfig; import com.apple.spark.operator.DriverSpec; import com.apple.spark.operator.ExecutorSpec; -import com.apple.spark.operator.SparkApplicationResource; -import com.apple.spark.operator.SparkApplicationResourceList; +import com.apple.spark.operator.SparkApplication; import com.apple.spark.operator.SparkApplicationSpec; import com.apple.spark.util.CounterMetricContainer; import com.apple.spark.util.DateTimeUtils; import com.apple.spark.util.GaugeMetricContainer; import com.apple.spark.util.KubernetesClusterAndNamespace; import io.fabric8.kubernetes.api.model.Pod; -import io.fabric8.kubernetes.client.DefaultKubernetesClient; import io.fabric8.kubernetes.client.KubernetesClient; -import io.fabric8.kubernetes.client.dsl.base.CustomResourceDefinitionContext; -import io.fabric8.kubernetes.client.dsl.base.OperationContext; import io.fabric8.kubernetes.client.informers.ResourceEventHandler; import io.fabric8.kubernetes.client.informers.SharedIndexInformer; import io.fabric8.kubernetes.client.informers.SharedInformerFactory; @@ -157,7 +153,7 @@ public void start() { String.format( "%s/%s", SparkConstants.SPARK_APPLICATION_CRD_GROUP, SparkConstants.CRD_VERSION), SparkConstants.SPARK_APPLICATION_KIND, - SparkApplicationResource.class); + SparkApplication.class); if (appConfig.getSparkClusters() != null) { Map uniqueClusters = new HashMap<>(); @@ -195,32 +191,27 @@ private void start(AppConfig.SparkCluster sparkCluster, Timer timer) { sparkCluster.getEksCluster(), sparkCluster.getSparkApplicationNamespace()); - DefaultKubernetesClient client = KubernetesHelper.getK8sClient(sparkCluster); + KubernetesClient client = KubernetesHelper.getK8sClient(sparkCluster); + clients.add(client); SharedInformerFactory sharedInformerFactory = client.informers(); informerFactories.add(sharedInformerFactory); - CustomResourceDefinitionContext crdContext = KubernetesHelper.getSparkApplicationCrdContext(); - SharedIndexInformer informer = - sharedInformerFactory.sharedIndexInformerForCustomResource( - crdContext, - SparkApplicationResource.class, - SparkApplicationResourceList.class, - new OperationContext().withNamespace(sparkCluster.getSparkApplicationNamespace()), - RESYNC_MILLIS); + SharedIndexInformer informer = + sharedInformerFactory.sharedIndexInformerFor(SparkApplication.class, RESYNC_MILLIS); RunningApplicationMonitor runningApplicationMonitor = new RunningApplicationMonitor(sparkCluster, timer, meterRegistry); informer.addEventHandler( - new ResourceEventHandler() { + new ResourceEventHandler() { @Override - public void onAdd(SparkApplicationResource sparkApplicationResource) {} + public void onAdd(SparkApplication sparkApplicationResource) {} @Override public void onUpdate( - SparkApplicationResource prevCRDState, SparkApplicationResource newCRDState) { + SparkApplication prevCRDState, SparkApplication newCRDState) { int timeoutMillis = 100; try { boolean added = @@ -245,7 +236,7 @@ public void onUpdate( @Override public void onDelete( - SparkApplicationResource sparkApplicationResource, + SparkApplication sparkApplicationResource, boolean deletedFinalStateUnknown) {} }); @@ -285,8 +276,8 @@ public void close() { */ private void onUpdateImpl_logApplication( AppConfig.SparkCluster sparkCluster, - SparkApplicationResource prevCRDState, - SparkApplicationResource currCRDState) { + SparkApplication prevCRDState, + SparkApplication currCRDState) { String submissionId = currCRDState.getMetadata().getName(); String newState = SparkApplicationResourceHelper.getState(currCRDState); String oldState = SparkApplicationResourceHelper.getState(prevCRDState); @@ -307,7 +298,7 @@ private void onUpdateImpl_logApplication( Timestamp startTime = null; if (!driverInfoCollectedSubIds.contains(submissionId) && newState.equals(SparkConstants.RUNNING_STATE)) { - try (DefaultKubernetesClient client = KubernetesHelper.getK8sClient(sparkCluster)) { + try (KubernetesClient client = KubernetesHelper.getK8sClient(sparkCluster)) { String driverStartTime; SparkApplicationSpec spec = currCRDState.getSpec(); DriverSpec driverSpec = null; diff --git a/src/main/java/com/apple/spark/core/ApplicationUpdateEvent.java b/src/main/java/com/apple/spark/core/ApplicationUpdateEvent.java index 9d4e36f..61ba68e 100644 --- a/src/main/java/com/apple/spark/core/ApplicationUpdateEvent.java +++ b/src/main/java/com/apple/spark/core/ApplicationUpdateEvent.java @@ -20,13 +20,13 @@ package com.apple.spark.core; import com.apple.spark.AppConfig; -import com.apple.spark.operator.SparkApplicationResource; +import com.apple.spark.operator.SparkApplication; /** The event generated whenever there's CRD update from an application. */ public class ApplicationUpdateEvent { - private final SparkApplicationResource prevCRDState; - private final SparkApplicationResource newCRDState; + private final SparkApplication prevCRDState; + private final SparkApplication newCRDState; // The runningApplicationMonitor instance corresponding to the specific Spark cluster private final RunningApplicationMonitor runningApplicationMonitor; @@ -36,8 +36,8 @@ public class ApplicationUpdateEvent { public ApplicationUpdateEvent( AppConfig.SparkCluster sparkCluster, - SparkApplicationResource prevCRDState, - SparkApplicationResource newCRDState, + SparkApplication prevCRDState, + SparkApplication newCRDState, RunningApplicationMonitor runningApplicationMonitor) { this.prevCRDState = prevCRDState; this.newCRDState = newCRDState; @@ -45,11 +45,11 @@ public ApplicationUpdateEvent( this.runningApplicationMonitor = runningApplicationMonitor; } - public SparkApplicationResource getPrevCRDState() { + public SparkApplication getPrevCRDState() { return prevCRDState; } - public SparkApplicationResource getNewCRDState() { + public SparkApplication getNewCRDState() { return newCRDState; } diff --git a/src/main/java/com/apple/spark/core/KubernetesHelper.java b/src/main/java/com/apple/spark/core/KubernetesHelper.java index a4bff80..47f118c 100644 --- a/src/main/java/com/apple/spark/core/KubernetesHelper.java +++ b/src/main/java/com/apple/spark/core/KubernetesHelper.java @@ -21,11 +21,8 @@ import com.apple.spark.AppConfig; import com.apple.spark.util.EndAwareInputStream; -import io.fabric8.kubernetes.api.model.DoneablePod; import io.fabric8.kubernetes.api.model.Pod; -import io.fabric8.kubernetes.client.Config; -import io.fabric8.kubernetes.client.ConfigBuilder; -import io.fabric8.kubernetes.client.DefaultKubernetesClient; +import io.fabric8.kubernetes.client.*; import io.fabric8.kubernetes.client.dsl.LogWatch; import io.fabric8.kubernetes.client.dsl.PodResource; import io.fabric8.kubernetes.client.dsl.base.CustomResourceDefinitionContext; @@ -135,8 +132,7 @@ public static CustomResourceDefinitionContext getSparkApplicationCrdContext() { public static InputStream tryGetLogStream( DefaultKubernetesClient client, String namespace, String podName) { - PodResource podResource = - client.inNamespace(namespace).pods().withName(podName); + PodResource podResource = client.pods().inNamespace(namespace).withName(podName); if (podResource == null) { logger.info("Cannot get pod resource {}", podName); return null; diff --git a/src/main/java/com/apple/spark/core/RestSubmissionsStreamingOutput.java b/src/main/java/com/apple/spark/core/RestSubmissionsStreamingOutput.java index 3ac1f3f..51b3dac 100644 --- a/src/main/java/com/apple/spark/core/RestSubmissionsStreamingOutput.java +++ b/src/main/java/com/apple/spark/core/RestSubmissionsStreamingOutput.java @@ -21,7 +21,7 @@ import com.apple.spark.AppConfig; import com.apple.spark.api.SubmissionSummary; -import com.apple.spark.operator.SparkApplicationResource; +import com.apple.spark.operator.SparkApplication; import com.apple.spark.operator.SparkApplicationResourceList; import com.fasterxml.jackson.databind.ObjectMapper; import java.io.IOException; @@ -39,12 +39,12 @@ protected void writeSubmissions( if (list == null) { return; } - List sparkApplicationResources = list.getItems(); + List sparkApplicationResources = list.getItems(); if (sparkApplicationResources == null) { return; } ObjectMapper objectMapper = new ObjectMapper(); - for (SparkApplicationResource sparkApplicationResource : sparkApplicationResources) { + for (SparkApplication sparkApplicationResource : sparkApplicationResources) { SubmissionSummary submission = new SubmissionSummary(); submission.copyFrom(sparkApplicationResource, sparkCluster, appConfig); String str = objectMapper.writeValueAsString(submission); diff --git a/src/main/java/com/apple/spark/core/RunningApplicationMonitor.java b/src/main/java/com/apple/spark/core/RunningApplicationMonitor.java index 1e9a258..0bf60b4 100644 --- a/src/main/java/com/apple/spark/core/RunningApplicationMonitor.java +++ b/src/main/java/com/apple/spark/core/RunningApplicationMonitor.java @@ -24,14 +24,14 @@ import com.apple.spark.AppConfig; import com.apple.spark.operator.DriverInfo; -import com.apple.spark.operator.SparkApplicationResource; -import com.apple.spark.operator.SparkApplicationResourceDoneable; +import com.apple.spark.operator.SparkApplication; import com.apple.spark.operator.SparkApplicationResourceList; import com.apple.spark.util.CounterMetricContainer; import com.apple.spark.util.DateTimeUtils; import com.apple.spark.util.GaugeMetricContainer; -import io.fabric8.kubernetes.client.DefaultKubernetesClient; -import io.fabric8.kubernetes.client.dsl.base.CustomResourceDefinitionContext; +import io.fabric8.kubernetes.client.KubernetesClient; +import io.fabric8.kubernetes.client.dsl.MixedOperation; +import io.fabric8.kubernetes.client.dsl.Resource; import io.micrometer.core.instrument.MeterRegistry; import io.micrometer.core.instrument.Tag; import java.util.List; @@ -111,13 +111,13 @@ public void run() { deleteInterval); } - public static long getMaxRunningMillis(SparkApplicationResource sparkApplicationResource) { - if (sparkApplicationResource.getMetadata() == null - || sparkApplicationResource.getMetadata().getLabels() == null) { + public static long getMaxRunningMillis(SparkApplication sparkApplication) { + if (sparkApplication.getMetadata() == null + || sparkApplication.getMetadata().getLabels() == null) { return Constants.DEFAULT_MAX_RUNNING_MILLIS; } String labelValue = - sparkApplicationResource.getMetadata().getLabels().get(Constants.MAX_RUNNING_MILLIS_LABEL); + sparkApplication.getMetadata().getLabels().get(Constants.MAX_RUNNING_MILLIS_LABEL); if (labelValue == null || labelValue.isEmpty()) { return Constants.DEFAULT_MAX_RUNNING_MILLIS; } @@ -129,7 +129,7 @@ public static long getMaxRunningMillis(SparkApplicationResource sparkApplication "Failed to parse value %s for label %s on %s", labelValue, Constants.MAX_RUNNING_MILLIS_LABEL, - sparkApplicationResource.getMetadata().getName()), + sparkApplication.getMetadata().getName()), ex); return Constants.DEFAULT_MAX_RUNNING_MILLIS; } @@ -143,7 +143,7 @@ public static long getMaxRunningMillis(SparkApplicationResource sparkApplication * @param currCRDState the current CRD state */ public void onUpdate( - SparkApplicationResource prevCRDState, SparkApplicationResource currCRDState) { + SparkApplication prevCRDState, SparkApplication currCRDState) { String newState = SparkApplicationResourceHelper.getState(currCRDState); if (SparkConstants.RUNNING_STATE.equalsIgnoreCase(newState)) { String name = currCRDState.getMetadata().getName(); @@ -214,18 +214,14 @@ public int getApplicationCount() { * @param appName the name of the app (typically submission ID) */ protected void killApplication(String namespace, String appName) { - try (DefaultKubernetesClient client = KubernetesHelper.getK8sClient(sparkCluster)) { - CustomResourceDefinitionContext crdContext = KubernetesHelper.getSparkApplicationCrdContext(); - SparkApplicationResource sparkApplicationResource = - client - .customResources( - crdContext, - SparkApplicationResource.class, - SparkApplicationResourceList.class, - SparkApplicationResourceDoneable.class) - .inNamespace(namespace) - .withName(appName) - .get(); + try (KubernetesClient client = KubernetesHelper.getK8sClient(sparkCluster)) { + MixedOperation> + sparkApplicationClient = + client.resources(SparkApplication.class, SparkApplicationResourceList.class); + + SparkApplication sparkApplicationResource = + sparkApplicationClient.inNamespace(namespace).withName(appName).get(); + if (sparkApplicationResource == null) { logger.warn( "Failed to kill application {}/{} due to application not found", namespace, appName); diff --git a/src/main/java/com/apple/spark/core/SparkApplicationResourceHelper.java b/src/main/java/com/apple/spark/core/SparkApplicationResourceHelper.java index 222808c..2ccf303 100644 --- a/src/main/java/com/apple/spark/core/SparkApplicationResourceHelper.java +++ b/src/main/java/com/apple/spark/core/SparkApplicationResourceHelper.java @@ -19,11 +19,11 @@ package com.apple.spark.core; -import com.apple.spark.operator.SparkApplicationResource; +import com.apple.spark.operator.SparkApplication; public class SparkApplicationResourceHelper { - public static String getState(SparkApplicationResource sparkApp) { + public static String getState(SparkApplication sparkApp) { if (sparkApp.getStatus() != null && sparkApp.getStatus().getApplicationState() != null) { return sparkApp.getStatus().getApplicationState().getState(); } diff --git a/src/main/java/com/apple/spark/operator/SparkApplicationResource.java b/src/main/java/com/apple/spark/operator/SparkApplication.java similarity index 86% rename from src/main/java/com/apple/spark/operator/SparkApplicationResource.java rename to src/main/java/com/apple/spark/operator/SparkApplication.java index 460fb33..bbe0146 100644 --- a/src/main/java/com/apple/spark/operator/SparkApplicationResource.java +++ b/src/main/java/com/apple/spark/operator/SparkApplication.java @@ -22,10 +22,14 @@ import com.fasterxml.jackson.annotation.JsonIgnoreProperties; import com.fasterxml.jackson.annotation.JsonInclude; import io.fabric8.kubernetes.client.CustomResource; +import io.fabric8.kubernetes.model.annotation.Group; +import io.fabric8.kubernetes.model.annotation.Version; +@Version("v1beta2") +@Group("sparkoperator.k8s.io") @JsonInclude(JsonInclude.Include.NON_NULL) @JsonIgnoreProperties(ignoreUnknown = true) -public class SparkApplicationResource extends CustomResource { +public class SparkApplication extends CustomResource { private SparkApplicationSpec spec; private SparkApplicationStatus status; diff --git a/src/main/java/com/apple/spark/operator/SparkApplicationResourceDoneable.java b/src/main/java/com/apple/spark/operator/SparkApplicationResourceDoneable.java deleted file mode 100644 index f3c20c8..0000000 --- a/src/main/java/com/apple/spark/operator/SparkApplicationResourceDoneable.java +++ /dev/null @@ -1,33 +0,0 @@ -/* - * - * This source file is part of the Batch Processing Gateway open source project - * - * Copyright 2022 Apple Inc. and the Batch Processing Gateway project authors - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.apple.spark.operator; - -import io.fabric8.kubernetes.api.builder.Function; -import io.fabric8.kubernetes.client.CustomResourceDoneable; - -public class SparkApplicationResourceDoneable - extends CustomResourceDoneable { - - public SparkApplicationResourceDoneable( - SparkApplicationResource resource, - Function function) { - super(resource, function); - } -} diff --git a/src/main/java/com/apple/spark/operator/SparkApplicationResourceList.java b/src/main/java/com/apple/spark/operator/SparkApplicationResourceList.java index dbc133c..06eeaeb 100644 --- a/src/main/java/com/apple/spark/operator/SparkApplicationResourceList.java +++ b/src/main/java/com/apple/spark/operator/SparkApplicationResourceList.java @@ -21,4 +21,4 @@ import io.fabric8.kubernetes.client.CustomResourceList; -public class SparkApplicationResourceList extends CustomResourceList {} +public class SparkApplicationResourceList extends CustomResourceList {} diff --git a/src/main/java/com/apple/spark/rest/ApplicationGetLogRest.java b/src/main/java/com/apple/spark/rest/ApplicationGetLogRest.java index edb4005..6070303 100644 --- a/src/main/java/com/apple/spark/rest/ApplicationGetLogRest.java +++ b/src/main/java/com/apple/spark/rest/ApplicationGetLogRest.java @@ -34,7 +34,7 @@ import com.apple.spark.core.KubernetesHelper; import com.apple.spark.core.LogDao; import com.apple.spark.core.RestStreamingOutput; -import com.apple.spark.operator.SparkApplicationResource; +import com.apple.spark.operator.SparkApplication; import com.apple.spark.security.User; import com.apple.spark.util.ExceptionUtils; import com.codahale.metrics.MetricRegistry; @@ -192,7 +192,7 @@ public Response getLog( // If s3only is true, skip searching EKS. if (s3only.equalsIgnoreCase("false")) { try { - final SparkApplicationResource sparkApplicationResource = getSparkApplicationResource(id); + final SparkApplication sparkApplicationResource = getSparkApplicationResource(id); logStream = getLog(sparkApplicationResource, execId); } catch (Throwable ex) { ExceptionUtils.meterException(); @@ -300,7 +300,7 @@ private AmazonS3 getS3Client() { } @ExceptionMetered(name = "RuntimeException", absolute = true, cause = RuntimeException.class) - private InputStream getLog(SparkApplicationResource sparkApplication, String execId) { + private InputStream getLog(SparkApplication sparkApplication, String execId) { if (sparkApplication == null) { logger.info("Cannot get log from EKS, spark application not found"); return null; diff --git a/src/main/java/com/apple/spark/rest/ApplicationSubmissionRest.java b/src/main/java/com/apple/spark/rest/ApplicationSubmissionRest.java index c65674d..7b48401 100644 --- a/src/main/java/com/apple/spark/rest/ApplicationSubmissionRest.java +++ b/src/main/java/com/apple/spark/rest/ApplicationSubmissionRest.java @@ -43,8 +43,7 @@ import com.apple.spark.api.SubmitApplicationResponse; import com.apple.spark.core.*; import com.apple.spark.operator.DriverInfo; -import com.apple.spark.operator.SparkApplicationResource; -import com.apple.spark.operator.SparkApplicationResourceDoneable; +import com.apple.spark.operator.SparkApplication; import com.apple.spark.operator.SparkApplicationResourceList; import com.apple.spark.operator.SparkApplicationSpec; import com.apple.spark.security.User; @@ -69,6 +68,9 @@ import io.fabric8.kubernetes.api.model.Pod; import io.fabric8.kubernetes.api.model.PodStatus; import io.fabric8.kubernetes.client.DefaultKubernetesClient; +import io.fabric8.kubernetes.client.KubernetesClient; +import io.fabric8.kubernetes.client.dsl.MixedOperation; +import io.fabric8.kubernetes.client.dsl.Resource; import io.fabric8.kubernetes.client.dsl.base.CustomResourceDefinitionContext; import io.micrometer.core.instrument.MeterRegistry; import io.micrometer.core.instrument.Tag; @@ -154,7 +156,7 @@ public GetSubmissionStatusResponseCacheValue load(String submissionId) { AppConfig.SparkCluster sparkCluster = getSparkCluster(submissionId); try { GetSubmissionStatusResponse response = - getStatusImplWithoutCache(submissionId, sparkCluster); + getStatusImplWithoutCache(getSparkApplicationResource(submissionId), submissionId, sparkCluster); return new GetSubmissionStatusResponseCacheValue(response); } catch (Throwable ex) { requestCounters.increment( @@ -343,21 +345,21 @@ private SubmitApplicationResponse submitSparkCRD( com.codahale.metrics.Timer timer = registry.timer(this.getClass().getSimpleName() + ".submitApplication.k8s-time"); - try (DefaultKubernetesClient client = KubernetesHelper.getK8sClient(sparkCluster); - com.codahale.metrics.Timer.Context context = timer.time()) { - SparkApplicationResource sparkApplicationResource = new SparkApplicationResource(); - sparkApplicationResource.setApiVersion(SparkConstants.SPARK_OPERATOR_API_VERSION); - sparkApplicationResource.setKind(SparkConstants.SPARK_APPLICATION_KIND); - sparkApplicationResource.getMetadata().setName(submissionId); - sparkApplicationResource + try (KubernetesClient client = KubernetesHelper.getK8sClient(sparkCluster); + com.codahale.metrics.Timer.Context context = timer.time()) { + SparkApplication sparkApplication = new SparkApplication(); + sparkApplication.setApiVersion(SparkConstants.SPARK_OPERATOR_API_VERSION); + sparkApplication.setKind(SparkConstants.SPARK_APPLICATION_KIND); + sparkApplication.getMetadata().setName(submissionId); + sparkApplication .getMetadata() .setNamespace(sparkCluster.getSparkApplicationNamespace()); - if (sparkApplicationResource.getMetadata().getLabels() == null) { - sparkApplicationResource.getMetadata().setLabels(new HashMap<>()); + if (sparkApplication.getMetadata().getLabels() == null) { + sparkApplication.getMetadata().setLabels(new HashMap<>()); } if (sparkSpec.getProxyUser() != null) { - sparkApplicationResource + sparkApplication .getMetadata() .getLabels() .put(PROXY_USER_LABEL, sparkSpec.getProxyUser()); @@ -365,13 +367,13 @@ private SubmitApplicationResponse submitSparkCRD( if (request.getApplicationName() != null) { String applicationNameLabelValue = KubernetesHelper.normalizeLabelValue(request.getApplicationName()); - sparkApplicationResource + sparkApplication .getMetadata() .getLabels() .put(APPLICATION_NAME_LABEL, applicationNameLabelValue); } - sparkApplicationResource + sparkApplication .getMetadata() .getLabels() .put(QUEUE_LABEL, YUNIKORN_ROOT_QUEUE + "." + queue); @@ -400,7 +402,7 @@ private SubmitApplicationResponse submitSparkCRD( } } } - sparkApplicationResource + sparkApplication .getMetadata() .getLabels() .put(MAX_RUNNING_MILLIS_LABEL, String.valueOf(maxRunningMillis)); @@ -412,16 +414,13 @@ private SubmitApplicationResponse submitSparkCRD( logger.warn("Failed to serialize SparkApplicationSpec and mask sensitive info", ex); } - sparkApplicationResource.setSpec(sparkSpec); - CustomResourceDefinitionContext crdContext = KubernetesHelper.getSparkApplicationCrdContext(); - client - .customResources( - crdContext, - SparkApplicationResource.class, - SparkApplicationResourceList.class, - SparkApplicationResourceDoneable.class) - .create(sparkApplicationResource); + sparkApplication.setSpec(sparkSpec); + + MixedOperation> + sparkApplicationClient = + client.resources(SparkApplication.class, SparkApplicationResourceList.class); + sparkApplicationClient.create(sparkApplication); SubmitApplicationResponse response = new SubmitApplicationResponse(); response.setSubmissionId(submissionId); context.stop(); @@ -465,18 +464,16 @@ public DeleteSubmissionResponse deleteSubmission( AppConfig.SparkCluster sparkCluster = getSparkCluster(submissionId); com.codahale.metrics.Timer timer = registry.timer(this.getClass().getSimpleName() + ".deleteSubmission.k8s-time"); - try (DefaultKubernetesClient client = KubernetesHelper.getK8sClient(sparkCluster); + try (KubernetesClient client = KubernetesHelper.getK8sClient(sparkCluster); com.codahale.metrics.Timer.Context context = timer.time()) { - CustomResourceDefinitionContext crdContext = KubernetesHelper.getSparkApplicationCrdContext(); - client - .customResources( - crdContext, - SparkApplicationResource.class, - SparkApplicationResourceList.class, - SparkApplicationResourceDoneable.class) - .inNamespace(sparkCluster.getSparkApplicationNamespace()) - .withName(submissionId) - .delete(); + MixedOperation> + sparkApplicationClient = + client.resources(SparkApplication.class, SparkApplicationResourceList.class); + + sparkApplicationClient + .inNamespace(sparkCluster.getSparkApplicationNamespace()) + .withName(submissionId) + .delete(); context.stop(); return new DeleteSubmissionResponse(); } @@ -515,9 +512,9 @@ public SparkApplicationSpec getSparkSpec( clientVersion); requestCounters.increment( REQUEST_METRIC_NAME, Tag.of("name", "get_spec"), Tag.of("user", user.getName())); - SparkApplicationResource sparkApplicationResource = getSparkApplicationResource(submissionId); + SparkApplication sparkApplication = getSparkApplicationResource(submissionId); SparkApplicationSpec sparkApplicationSpec = - removeEnvFromSpec(sparkApplicationResource.getSpec()); + removeEnvFromSpec(sparkApplication.getSpec()); return sparkApplicationSpec; } @@ -561,13 +558,13 @@ private GetSubmissionStatusResponse getStatusImpl(String submissionId, User user STATUS_CACHE_GET_FAILURE, Tag.of("exception", ex.getClass().getSimpleName())); logger.warn(String.format("Failed to get status from cache for %s", submissionId), ex); AppConfig.SparkCluster sparkCluster = getSparkCluster(submissionId); - return getStatusImplWithoutCache(submissionId, sparkCluster); + return getStatusImplWithoutCache(getSparkApplicationResource(submissionId), submissionId, sparkCluster); } if (cacheValue == null) { logger.warn("Got null status cache value for {}", submissionId); AppConfig.SparkCluster sparkCluster = getSparkCluster(submissionId); - return getStatusImplWithoutCache(submissionId, sparkCluster); + return getStatusImplWithoutCache(getSparkApplicationResource(submissionId), submissionId, sparkCluster); } long cacheElapsedTime = System.currentTimeMillis() - cacheValue.getCreatedTimeMillis(); @@ -578,7 +575,7 @@ private GetSubmissionStatusResponse getStatusImpl(String submissionId, User user logger.warn( "Got expired status cache value ({} millis) for {}", cacheElapsedTime, submissionId); AppConfig.SparkCluster sparkCluster = getSparkCluster(submissionId); - return getStatusImplWithoutCache(submissionId, sparkCluster); + return getStatusImplWithoutCache(getSparkApplicationResource(submissionId), submissionId, sparkCluster); } if (cacheValue.getResponse() == null) { @@ -603,24 +600,8 @@ private GetSubmissionStatusResponse getStatusImpl(String submissionId, User user } private GetSubmissionStatusResponse getStatusImplWithoutCache( - String submissionId, AppConfig.SparkCluster sparkCluster) { - com.codahale.metrics.Timer timer = - registry.timer(this.getClass().getSimpleName() + ".getStatus.k8s-time"); - try (DefaultKubernetesClient client = KubernetesHelper.getK8sClient(sparkCluster); - com.codahale.metrics.Timer.Context context = timer.time()) { - CustomResourceDefinitionContext crdContext = KubernetesHelper.getSparkApplicationCrdContext(); - - SparkApplicationResource sparkApplication = - client - .customResources( - crdContext, - SparkApplicationResource.class, - SparkApplicationResourceList.class, - SparkApplicationResourceDoneable.class) - .inNamespace(sparkCluster.getSparkApplicationNamespace()) - .withName(submissionId) - .get(); - context.stop(); + SparkApplication sparkApplication, String submissionId, AppConfig.SparkCluster sparkCluster) { + if (sparkApplication == null) { throw new WebApplicationException( String.format("Application submission %s not found", submissionId), @@ -678,7 +659,6 @@ private GetSubmissionStatusResponse getStatusImplWithoutCache( } return response; - } } @GET() @@ -716,7 +696,7 @@ public GetDriverInfoResponse getDriverInfo( REQUEST_METRIC_NAME, Tag.of("name", "get_driver"), Tag.of("user", user.getName())); AppConfig.SparkCluster sparkCluster = getSparkCluster(submissionId); - SparkApplicationResource sparkApplicationResource = getSparkApplicationResource(submissionId); + SparkApplication sparkApplicationResource = getSparkApplicationResource(submissionId); if (sparkApplicationResource.getStatus() == null || sparkApplicationResource.getStatus().getDriverInfo() == null) { return new GetDriverInfoResponse(); @@ -770,7 +750,7 @@ public Response describe( Tag.of("name", "describe_application"), Tag.of("user", user.getName())); - final SparkApplicationResource sparkApplicationResource = + final SparkApplication sparkApplicationResource = getSparkApplicationResource(submissionId); SparkApplicationSpec sparkApplicationSpec = removeEnvFromSpec(sparkApplicationResource.getSpec()); @@ -862,9 +842,9 @@ public GetMySubmissionsResponse getSubmissions( for (AppConfig.SparkCluster sparkCluster : getSparkClusters()) { SparkApplicationResourceList list = getSparkApplicationResourcesByUser(sparkCluster, user.getName()); - List sparkApplicationResources = list.getItems(); + List sparkApplicationResources = list.getItems(); if (sparkApplicationResources != null) { - for (SparkApplicationResource sparkApplicationResource : sparkApplicationResources) { + for (SparkApplication sparkApplicationResource : sparkApplicationResources) { SubmissionSummary submission = new SubmissionSummary(); submission.copyFrom(sparkApplicationResource, sparkCluster, getAppConfig()); submissionList.add(submission); @@ -884,11 +864,12 @@ protected List getEvents(AppConfig.SparkCluster sparkCluster, String obje try (DefaultKubernetesClient client = KubernetesHelper.getK8sClient(sparkCluster); com.codahale.metrics.Timer.Context context = timer.time()) { EventList eventList = - client - .events() - .inNamespace(sparkCluster.getSparkApplicationNamespace()) - .withFields(fields) - .list(); + client + .v1() + .events() + .inNamespace(sparkCluster.getSparkApplicationNamespace()) + .withFields(fields) + .list(); context.stop(); if (eventList == null) { return Collections.EMPTY_LIST; diff --git a/src/main/java/com/apple/spark/rest/RestBase.java b/src/main/java/com/apple/spark/rest/RestBase.java index f6989b6..5ffb2ed 100644 --- a/src/main/java/com/apple/spark/rest/RestBase.java +++ b/src/main/java/com/apple/spark/rest/RestBase.java @@ -23,8 +23,7 @@ import com.apple.spark.core.ApplicationSubmissionHelper; import com.apple.spark.core.Constants; import com.apple.spark.core.KubernetesHelper; -import com.apple.spark.operator.SparkApplicationResource; -import com.apple.spark.operator.SparkApplicationResourceDoneable; +import com.apple.spark.operator.SparkApplication; import com.apple.spark.operator.SparkApplicationResourceList; import com.apple.spark.util.CounterMetricContainer; import com.apple.spark.util.TimerMetricContainer; @@ -33,7 +32,9 @@ import com.google.common.util.concurrent.RateLimiter; import io.fabric8.kubernetes.api.model.Pod; import io.fabric8.kubernetes.client.DefaultKubernetesClient; -import io.fabric8.kubernetes.client.dsl.base.CustomResourceDefinitionContext; +import io.fabric8.kubernetes.client.KubernetesClient; +import io.fabric8.kubernetes.client.dsl.MixedOperation; +import io.fabric8.kubernetes.client.dsl.Resource; import io.micrometer.core.instrument.MeterRegistry; import io.swagger.v3.oas.annotations.OpenAPIDefinition; import io.swagger.v3.oas.annotations.info.Info; @@ -114,23 +115,23 @@ protected AppConfig.SparkCluster getSparkCluster(String submissionId) { return sparkClusterOptional.get(); } - protected SparkApplicationResource getSparkApplicationResource(String submissionId) { + protected SparkApplication getSparkApplicationResource(String submissionId) { AppConfig.SparkCluster sparkCluster = getSparkCluster(submissionId); com.codahale.metrics.Timer timer = registry.timer(this.getClass().getSimpleName() + ".getSparkApplicationResource.k8s-time"); - try (DefaultKubernetesClient client = KubernetesHelper.getK8sClient(sparkCluster); - com.codahale.metrics.Timer.Context context = timer.time()) { - CustomResourceDefinitionContext crdContext = KubernetesHelper.getSparkApplicationCrdContext(); - SparkApplicationResource sparkApplication = - client - .customResources( - crdContext, - SparkApplicationResource.class, - SparkApplicationResourceList.class, - SparkApplicationResourceDoneable.class) - .inNamespace(sparkCluster.getSparkApplicationNamespace()) - .withName(submissionId) - .get(); + try (KubernetesClient client = KubernetesHelper.getK8sClient(sparkCluster); + com.codahale.metrics.Timer.Context context = timer.time()) { + + MixedOperation> + sparkApplicationClient = + client.resources(SparkApplication.class, SparkApplicationResourceList.class); + + SparkApplication sparkApplication = + sparkApplicationClient + .inNamespace(sparkCluster.getSparkApplicationNamespace()) + .withName(submissionId) + .get(); + context.stop(); if (sparkApplication == null) { throw new WebApplicationException(Response.Status.NOT_FOUND); @@ -151,17 +152,15 @@ protected SparkApplicationResourceList getSparkApplicationResourcesByLabel( this.getClass().getSimpleName() + ".getSparkApplicationResourcesByUser.k8s-time"); try (DefaultKubernetesClient client = KubernetesHelper.getK8sClient(sparkCluster); com.codahale.metrics.Timer.Context context = timer.time()) { - CustomResourceDefinitionContext crdContext = KubernetesHelper.getSparkApplicationCrdContext(); + MixedOperation> + sparkApplicationClient = + client.resources(SparkApplication.class, SparkApplicationResourceList.class); + SparkApplicationResourceList list = - client - .customResources( - crdContext, - SparkApplicationResource.class, - SparkApplicationResourceList.class, - SparkApplicationResourceDoneable.class) - .inNamespace(sparkCluster.getSparkApplicationNamespace()) - .withLabel(labelName, labelValue) - .list(); + sparkApplicationClient + .inNamespace(sparkCluster.getSparkApplicationNamespace()) + .withLabel(labelName, labelValue) + .list(); context.stop(); if (list == null) { return new SparkApplicationResourceList(); @@ -176,16 +175,13 @@ protected SparkApplicationResourceList getSparkApplicationResources( registry.timer(this.getClass().getSimpleName() + ".getSparkApplicationResources.k8s-time"); try (DefaultKubernetesClient client = KubernetesHelper.getK8sClient(sparkCluster); com.codahale.metrics.Timer.Context context = timer.time()) { - CustomResourceDefinitionContext crdContext = KubernetesHelper.getSparkApplicationCrdContext(); + MixedOperation> + sparkApplicationClient = + client.resources(SparkApplication.class, SparkApplicationResourceList.class); + SparkApplicationResourceList list = - client - .customResources( - crdContext, - SparkApplicationResource.class, - SparkApplicationResourceList.class, - SparkApplicationResourceDoneable.class) - .inNamespace(sparkCluster.getSparkApplicationNamespace()) - .list(); + sparkApplicationClient.inNamespace(sparkCluster.getSparkApplicationNamespace()).list(); + context.stop(); if (list == null) { return new SparkApplicationResourceList(); diff --git a/src/main/java/com/apple/spark/util/HttpUtils.java b/src/main/java/com/apple/spark/util/HttpUtils.java index 2e36d4d..59ab18f 100644 --- a/src/main/java/com/apple/spark/util/HttpUtils.java +++ b/src/main/java/com/apple/spark/util/HttpUtils.java @@ -21,70 +21,63 @@ import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; -import java.io.IOException; import java.io.InputStream; -import java.security.KeyManagementException; -import java.security.NoSuchAlgorithmException; -import java.security.cert.CertificateException; -import java.security.cert.X509Certificate; -import java.util.concurrent.TimeUnit; -import javax.net.ssl.SSLContext; -import javax.net.ssl.SSLSocketFactory; -import javax.net.ssl.TrustManager; -import javax.net.ssl.X509TrustManager; -import okhttp3.MediaType; -import okhttp3.OkHttpClient; -import okhttp3.Request; -import okhttp3.RequestBody; -import okhttp3.Response; -import okio.BufferedSink; +import java.net.URI; +import java.net.http.HttpClient; +import java.net.http.HttpRequest; +import java.net.http.HttpResponse; public class HttpUtils { public static T post( - String url, String requestJson, String headerName, String headerValue, Class clazz) { - MediaType mediaType = MediaType.parse("application/json"); - return post(url, requestJson, mediaType, headerName, headerValue, clazz); + String url, String requestJson, String headerName, String headerValue, Class clazz) { + return post(url, requestJson, "application/json", headerName, headerValue, clazz); } public static T post( - String url, - String requestText, - MediaType mediaType, - String headerName, - String headerValue, - Class clazz) { - String str = post(url, requestText, mediaType, headerName, headerValue); + String url, + String requestText, + String contentType, + String headerName, + String headerValue, + Class clazz) { + String str = post(url, requestText, contentType, headerName, headerValue); return parseJson(str, clazz); } public static String post(String url, String requestJson, String headerName, String headerValue) { - MediaType mediaType = MediaType.parse("application/json"); - return post(url, requestJson, mediaType, headerName, headerValue); + return post(url, requestJson, "application/json", headerName, headerValue); } public static String post( - String url, String requestText, MediaType mediaType, String headerName, String headerValue) { - OkHttpClient client = getOkHttpClient(); - RequestBody body = RequestBody.create(mediaType, requestText); - Request request = - new Request.Builder().url(url).header(headerName, headerValue).post(body).build(); - return executeHttpRequest(url, client, request); + String url, String requestText, String contentType, String headerName, String headerValue) { + HttpRequest request = null; + try { + var builder = HttpRequest.newBuilder().uri(new URI(url)).header("Content-Type", contentType); + if (headerName != null && !headerName.isEmpty()) { + builder = builder.header(headerName, headerValue); + } + request = builder.POST(HttpRequest.BodyPublishers.ofString(requestText)).build(); + return executeHttpRequest(url, request, HttpResponse.BodyHandlers.ofString()); + } catch (Throwable ex) { + throw new RuntimeException( + String.format("Failed to execute %s on %s", request.method(), url), ex); + } } public static T post( - String url, InputStream stream, String headerName, String headerValue, Class clazz) { + String url, InputStream stream, String headerName, String headerValue, Class clazz) { String str = post(url, stream, headerName, headerValue); return parseJson(str, clazz); } public static T post( - String url, - InputStream stream, - String headerName, - String headerValue, - long contentLength, - Class clazz) { + String url, + InputStream stream, + String headerName, + String headerValue, + long contentLength, + Class clazz) { String str = post(url, stream, headerName, headerValue, contentLength); return parseJson(str, clazz); } @@ -94,34 +87,19 @@ public static String post(String url, InputStream stream, String headerName, Str } public static String post( - String url, InputStream stream, String headerName, String headerValue, long contentLength) { - RequestBody requestBody = - new RequestBody() { - @Override - public MediaType contentType() { - return MediaType.parse("application/octet-stream"); - } - - @Override - public void writeTo(BufferedSink sink) throws IOException { - byte[] buffer = new byte[31]; - int size = stream.read(buffer); - while (size != -1) { - sink.write(buffer, 0, size); - size = stream.read(buffer); - } - } - - @Override - public long contentLength() throws IOException { - return contentLength; - } - }; - - OkHttpClient client = getOkHttpClient(); - Request request = - new Request.Builder().url(url).header(headerName, headerValue).post(requestBody).build(); - return executeHttpRequest(url, client, request); + String url, InputStream stream, String headerName, String headerValue, long contentLength) { + HttpRequest request = null; + try { + var builder = HttpRequest.newBuilder().uri(new URI(url)); + if (headerName != null && !headerName.isEmpty()) { + builder = builder.header(headerName, headerValue); + } + request = builder.POST(HttpRequest.BodyPublishers.ofInputStream(() -> stream)).build(); + return executeHttpRequest(url, request, HttpResponse.BodyHandlers.ofString()); + } catch (Throwable ex) { + throw new RuntimeException( + String.format("Failed to execute %s on %s", request.method(), url), ex); + } } public static T get(String url, String headerName, String headerValue, Class clazz) { @@ -130,15 +108,22 @@ public static T get(String url, String headerName, String headerValue, Class } public static String get(String url, String headerName, String headerValue) { - OkHttpClient client = getOkHttpClient(); - Request request = new Request.Builder().url(url).header(headerName, headerValue).get().build(); - return executeHttpRequest(url, client, request); + try { + var builder = HttpRequest.newBuilder().uri(new URI(url)); + if (headerName != null && !headerName.isEmpty()) { + builder = builder.header(headerName, headerValue); + } + HttpRequest request = builder.GET().build(); + HttpResponse response = + HttpClient.newBuilder().build().send(request, HttpResponse.BodyHandlers.ofString()); + return response.body(); + } catch (Throwable ex) { + throw new RuntimeException(String.format("Failed to get from %s", url), ex); + } } public static String get(String url) { - OkHttpClient client = getOkHttpClient(); - Request request = new Request.Builder().url(url).get().build(); - return executeHttpRequest(url, client, request); + return get(url, null, null); } public static T delete(String url, String headerName, String headerValue, Class clazz) { @@ -147,90 +132,35 @@ public static T delete(String url, String headerName, String headerValue, Cl } public static String delete(String url, String headerName, String headerValue) { - OkHttpClient client = getOkHttpClient(); - Request request = - new Request.Builder().url(url).header(headerName, headerValue).delete().build(); - return executeHttpRequest(url, client, request); - } - - public static Response getHttpResponse(String url, String headerName, String headerValue) { - OkHttpClient client = getOkHttpClient(); - Request request = new Request.Builder().url(url).header(headerName, headerValue).get().build(); - Response response = null; - try { - response = client.newCall(request).execute(); - } catch (IOException ex) { - throw new RuntimeException(String.format("Failed to get request to %s", url), ex); - } - try { - checkResponseOK(url, response); - } catch (RuntimeException ex) { - response.close(); - throw ex; - } - return response; - } - - private static OkHttpClient getOkHttpClient() { - X509TrustManager trustManager = - new X509TrustManager() { - @Override - public void checkClientTrusted(X509Certificate[] chain, String authType) - throws CertificateException {} - - @Override - public void checkServerTrusted(X509Certificate[] chain, String authType) - throws CertificateException {} - - @Override - public X509Certificate[] getAcceptedIssuers() { - return new X509Certificate[0]; - } - }; - try { - SSLContext sslContext = SSLContext.getInstance("SSL"); - sslContext.init(null, new TrustManager[] {trustManager}, new java.security.SecureRandom()); - SSLSocketFactory sslSocketFactory = sslContext.getSocketFactory(); - - OkHttpClient.Builder builder = new OkHttpClient.Builder(); - builder.sslSocketFactory(sslSocketFactory, trustManager); - return builder - .connectTimeout(90, TimeUnit.SECONDS) - .writeTimeout(90, TimeUnit.SECONDS) - .readTimeout(90, TimeUnit.SECONDS) - .build(); - } catch (NoSuchAlgorithmException | KeyManagementException e) { - throw new RuntimeException("Failed to create OkHttp client", e); - } - } - - private static String executeHttpRequest(String url, OkHttpClient client, Request request) { - try (Response response = client.newCall(request).execute()) { - checkResponseOK(url, response); - String responseStr = response.body().string(); - return responseStr; + var builder = HttpRequest.newBuilder().uri(new URI(url)); + if (headerName != null && !headerName.isEmpty()) { + builder = builder.header(headerName, headerValue); + } + HttpRequest request = builder.DELETE().build(); + HttpResponse response = + HttpClient.newBuilder().build().send(request, HttpResponse.BodyHandlers.ofString()); + return response.body(); } catch (Throwable ex) { - throw new RuntimeException( - String.format("Failed to execute %s on %s", request.method(), url), ex); + throw new RuntimeException(String.format("Failed to delete %s", url), ex); } } - private static void checkResponseOK(String url, Response response) { - if (response.code() < 200 || response.code() >= 300) { + private static void checkResponseOK(String url, HttpResponse response) { + if (response.statusCode() < 200 || response.statusCode() >= 300) { String responseBodyStr; try { - responseBodyStr = response.body().string(); - } catch (IOException e) { + responseBodyStr = response.body().toString(); // TODO modify this + } catch (Throwable e) { responseBodyStr = - String.format( - "(Failed to get response body, exception: %s)", - ExceptionUtils.getExceptionNameAndMessage(e)); + String.format( + "(Failed to get response body, exception: %s)", + ExceptionUtils.getExceptionNameAndMessage(e)); } throw new RuntimeException( - String.format( - "Response for %s is not OK: %s. Response body: %s", - url, response.code(), responseBodyStr)); + String.format( + "Response for %s is not OK: %s. Response body: %s", + url, response.statusCode(), responseBodyStr)); } } @@ -242,4 +172,39 @@ private static T parseJson(String str, Class clazz) { throw new RuntimeException(String.format("Failed to parse json: %s", str), e); } } + + public static HttpResponse getHttpResponse(String url, String headerName, String headerValue) { + HttpResponse response; + try { + var builder = HttpRequest.newBuilder().uri(new URI(url)); + if (headerName != null && !headerName.isEmpty()) { + builder = builder.header(headerName, headerValue); + } + HttpRequest request = builder.GET().build(); + response = + HttpClient.newBuilder().build().send(request, HttpResponse.BodyHandlers.ofString()); + } catch (Throwable ex) { + throw new RuntimeException(String.format("Failed to get request from %s", url), ex); + } + + try { + checkResponseOK(url, response); + } catch (RuntimeException ex) { + throw ex; + } + return response; + } + + private static String executeHttpRequest( + String url, HttpRequest request, HttpResponse.BodyHandler bodyHandler) { + try { + HttpResponse response = HttpClient.newBuilder().build().send(request, bodyHandler); + checkResponseOK(url, response); + String responseStr = response.body(); + return responseStr; + } catch (Throwable ex) { + throw new RuntimeException( + String.format("Failed to execute %s on %s", request.method(), url), ex); + } + } } diff --git a/src/test/java/com/apple/spark/core/RunningApplicationMonitorTest.java b/src/test/java/com/apple/spark/core/RunningApplicationMonitorTest.java index e0d8e9e..1bf289f 100644 --- a/src/test/java/com/apple/spark/core/RunningApplicationMonitorTest.java +++ b/src/test/java/com/apple/spark/core/RunningApplicationMonitorTest.java @@ -21,7 +21,7 @@ import com.apple.spark.AppConfig; import com.apple.spark.operator.ApplicationState; -import com.apple.spark.operator.SparkApplicationResource; +import com.apple.spark.operator.SparkApplication; import com.apple.spark.operator.SparkApplicationStatus; import com.apple.spark.util.DateTimeUtils; import io.micrometer.core.instrument.logging.LoggingMeterRegistry; @@ -42,8 +42,8 @@ public void test() throws InterruptedException { new RunningApplicationMonitor(sparkCluster, timer, interval, new LoggingMeterRegistry()); Assert.assertEquals(monitor.getApplicationCount(), 0); - SparkApplicationResource prevCRDState = new SparkApplicationResource(); - SparkApplicationResource newCRDState = new SparkApplicationResource(); + SparkApplication prevCRDState = new SparkApplication(); + SparkApplication newCRDState = new SparkApplication(); monitor.onUpdate(prevCRDState, newCRDState); Assert.assertEquals(monitor.getApplicationCount(), 0); @@ -80,7 +80,7 @@ public void test() throws InterruptedException { @Test public void getMaxRunningMillis() { - SparkApplicationResource sparkApplicationResource = new SparkApplicationResource(); + SparkApplication sparkApplicationResource = new SparkApplication(); Assert.assertEquals( RunningApplicationMonitor.getMaxRunningMillis(sparkApplicationResource), 12 * 60 * 60 * 1000); From d548bdb8c8524b4ed3ff0d12d5a94979f7da5245 Mon Sep 17 00:00:00 2001 From: Claudia Sun Date: Sat, 23 Sep 2023 22:43:20 -0700 Subject: [PATCH 2/5] modify gitignore yo include bpg-config.yaml --- .gitignore | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.gitignore b/.gitignore index 71beb56..1e00ce7 100644 --- a/.gitignore +++ b/.gitignore @@ -5,7 +5,7 @@ target/ classes/ dependency-reduced-pom.xml -config.yml +bpg-config.yaml venv/ __pycache__/ From 8d11cd00682dad4c4a61f6562d6ac4899a7b8d6c Mon Sep 17 00:00:00 2001 From: Claudia Sun Date: Sun, 24 Sep 2023 22:29:53 -0700 Subject: [PATCH 3/5] delete bpg-config.yaml --- bpg-config.yaml | 118 ------------------------------------------------ 1 file changed, 118 deletions(-) delete mode 100644 bpg-config.yaml diff --git a/bpg-config.yaml b/bpg-config.yaml deleted file mode 100644 index dff8962..0000000 --- a/bpg-config.yaml +++ /dev/null @@ -1,118 +0,0 @@ -defaultSparkConf: - spark.kubernetes.submission.connectionTimeout: 30000 - spark.kubernetes.submission.requestTimeout: 30000 - spark.kubernetes.driver.connectionTimeout: 30000 - spark.kubernetes.driver.requestTimeout: 30000 -sparkClusters: - - weight: 100 - id: minikube - eksCluster: arn:aws:eks:us-west-2:168594260668:cluster/og60-skate-eks-02 - masterUrl: https://7EB14B8493566A806F42E3AC481303AA.gr7.us-west-2.eks.amazonaws.com - caCertDataSOPS: LS0tLS1CRUdJTiBDRVJUSUZJQ0FURS0tLS0tCk1JSUM1ekNDQWMrZ0F3SUJBZ0lCQURBTkJna3Foa2lHOXcwQkFRc0ZBREFWTVJNd0VRWURWUVFERXdwcmRXSmwKY201bGRHVnpNQjRYRFRJeU1EUXhOVEl5TVRNeU1Gb1hEVE15TURReE1qSXlNVE15TUZvd0ZURVRNQkVHQTFVRQpBeE1LYTNWaVpYSnVaWFJsY3pDQ0FTSXdEUVlKS29aSWh2Y05BUUVCQlFBRGdnRVBBRENDQVFvQ2dnRUJBTU5oCnpaZGpzazNYbFZtUDc3dlhXVE1xZlNHb3ZPTExTRjU3QUxnVFFGR2lOb3R6WjZvb3BEOStCeW5WWWhoRFpCdlkKV1NuRUN6dFVGSUZTd014STA3bnI2dERPYkk0cGQ1aUNjZzdUV2YzL1ZYMHNQTVNrbGpMUVNpQi9pSHdGSkNlWQpDQVo2cWtvZ29kbFErU0NzL29jYW9XSklBWUI4UmxkZzhQd0FpQ09hNGR5aGF1UlYvdng1WWovS05ucnJqaEZXCkZZZHdvMkRLM0U5OFUwa2VpUTc5QjJvK1pRNEhYZVNYRHczdi96enBLOVQ3ZmJzV2t5NXhYa1EvQU9aMHR3aFcKcmdQQXY3RWhPWTZzeDh1SWY3ZGVtQ1Z0T3lVMUsrYVJkclViekkyd2RpK1E5eEM3NmlDL1ZUbDdwK2I0UzI2UgpTbkc1bi9OWDJoVVdDa0piUXFFQ0F3RUFBYU5DTUVBd0RnWURWUjBQQVFIL0JBUURBZ0trTUE4R0ExVWRFd0VCCi93UUZNQU1CQWY4d0hRWURWUjBPQkJZRUZDbGZmZ1BKRERrcllKdlJOVFptUjRaaysxZU5NQTBHQ1NxR1NJYjMKRFFFQkN3VUFBNElCQVFDS3cwMzhRd1RXYlBvS2ZKZHExRFVuVENXZW1LalVkYlJFZEJyMWdSQ09IZGhyZHZ5UQphSGVkMUcyZDYrbHROeHQ1S1pYU0Q3N1FaRGM3Rk5BWFkrTTFnZGFLT01xaEJUZWkwd0l2VGoxYUdtZWhJRXd3Cm9NK0NmSmloRXFMVS9USGxOVG1zaVIxRG1vMHBkZzNzQWMza3pGZ0hPMEVvUk0xYWlDNUV6SzRqeU1QeXhVYUQKaGNQWkx1ZVE2L3hMckpHRmlCN1Jxai9jT0FyM2ticGJCVUFxYXY1N0V3UEVxbnJHOGQ1Lzd3V25uelk5MkloNgpIRDlwVWJLemF2MFhYVVJsRUZxVXVEbElXZDFCWkdNS1FMTjF0NGtkYy9MWEI5RjlqU055NklaVXQzTjJCTklRCm9pOVpYbHE0RFlLeFdFRUZjVUIrZ2FVK3QrcTVEaERQM04wbwotLS0tLUVORCBDRVJUSUZJQ0FURS0tLS0tCg== - userTokenSOPS: ZXlKaGJHY2lPaUpTVXpJMU5pSXNJbXRwWkNJNklqbHpOSEJxTFROTlVqZFFha3R5ZURGVFVDMUdSRmxaWlRSaGVqbDRNbnAxYkZWV2NIUlZRVVpDWHpnaWZRLmV5SnBjM01pT2lKcmRXSmxjbTVsZEdWekwzTmxjblpwWTJWaFkyTnZkVzUwSWl3aWEzVmlaWEp1WlhSbGN5NXBieTl6WlhKMmFXTmxZV05qYjNWdWRDOXVZVzFsYzNCaFkyVWlPaUp6Y0dGeWF5MWhjSEJzYVdOaGRHbHZibk1pTENKcmRXSmxjbTVsZEdWekxtbHZMM05sY25acFkyVmhZMk52ZFc1MEwzTmxZM0psZEM1dVlXMWxJam9pYkc5allXd3RjM0JoY21zdGIzQmxjbUYwYjNJdGMzQmhjbXN0ZEc5clpXNHRiV3h3Y1cwaUxDSnJkV0psY201bGRHVnpMbWx2TDNObGNuWnBZMlZoWTJOdmRXNTBMM05sY25acFkyVXRZV05qYjNWdWRDNXVZVzFsSWpvaWJHOWpZV3d0YzNCaGNtc3RiM0JsY21GMGIzSXRjM0JoY21zaUxDSnJkV0psY201bGRHVnpMbWx2TDNObGNuWnBZMlZoWTJOdmRXNTBMM05sY25acFkyVXRZV05qYjNWdWRDNTFhV1FpT2lJek1qY3dPRGxqWWkwMU1XSm1MVFF3T0dVdE9XTTBaUzFsT0RjMllXSTRZamhpTWpVaUxDSnpkV0lpT2lKemVYTjBaVzA2YzJWeWRtbGpaV0ZqWTI5MWJuUTZjM0JoY21zdFlYQndiR2xqWVhScGIyNXpPbXh2WTJGc0xYTndZWEpyTFc5d1pYSmhkRzl5TFhOd1lYSnJJbjAuTDNCcTk2X3c2dlVQOVVYNnVzQmtvVFpzSVVJX0ZHbm9URFdxOHFRWVFPcnBXUkxqSkUzV0N5ZGxpS1QweWdZbmdfa1FhZ3J1VG1Mb2ZDeUc1dXBrMGd0UEllQi1Nel9aUlhXaldYdkRvME9yb0E0WFhzd1VsMnpSSkx2TEwzRlkwMWJWSXZVd0ZiSmpSNUhKdUtHa2o2U1haQ2ZxdVZRNHJPaDhqUWdJZkk4V2hMRGJlS2F5ZU16Rm5XS1lpdlpocllaeElQMzcteHJUY0V0NW9tZTVYR2hzZXh3UGhJRUdhY2E4NW1MbmV1czU5ZmhYdUhyNERrc3RURDFLYWhZaTBJMnFsdWtYdmtvbVdNRjVidDRwWTV5MkpVV3BXbTB2cHhOc3lPRDRUN01FZ2ZETkhwRDhmYUdVXzRzZlNLTkhXQkpaOEU0cG9PV1FZYko2WU9tM1V3 - userName: spark-operator - sparkApplicationNamespace: spark-applications - sparkServiceAccount: local-spark-operator-spark - sparkVersions: - - "3.2" - queues: - - poc - ttlSeconds: 259200 - sparkUIUrl: http://localhost:8080 - batchScheduler: yunikorn - sparkConf: - spark.kubernetes.executor.podNamePrefix: '{spark-application-resource-name}' - spark.eventLog.enabled: "true" - spark.eventLog.dir: s3a://bpg/eventlog - spark.history.fs.logDirectory: s3a://bpg/eventlog - spark.sql.warehouse.dir: s3a://bpg/warehouse - spark.sql.catalogImplementation: hive - spark.jars.ivy: /opt/spark/work-dir/.ivy2 - spark.hadoop.fs.s3a.connection.ssl.enabled: false - spark.hadoop.fs.s3a.access.key: claudia_sun - spark.hadoop.fs.s3a.secret.key: ad4ae70b65b67ad6a47026b9f128215a0200f7a3bd371d2e9a10f5a482397569 - spark.hadoop.fs.s3a.endpoint: 10.100.223.0:9878 - spark.hadoop.fs.s3a.impl: org.apache.hadoop.fs.s3a.S3AFileSystem - spark.hadoop.fs.s3a.change.detection.version.required: false - spark.hadoop.fs.s3a.change.detection.mode: none - spark.hadoop.fs.s3a.fast.upload: true - spark.jars.packages: org.apache.hadoop:hadoop-aws:3.2.2 - spark.hadoop.fs.s3a.aws.credentials.provider: org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider - sparkUIOptions: - ServicePort: 4040 - ingressAnnotations: - nginx.ingress.kubernetes.io/rewrite-target: /$2 - nginx.ingress.kubernetes.io/proxy-redirect-from: http://$host/ - nginx.ingress.kubernetes.io/proxy-redirect-to: /spark-applications-4/{spark-application-resource-name}/ - kubernetes.io/ingress.class: nginx - nginx.ingress.kubernetes.io/configuration-snippet: | - proxy_set_header Accept-Encoding ""; # disable compression - sub_filter_last_modified off; - sub_filter '' ' '; # add base url - sub_filter 'href="/' 'href="'; # remove absolute URL path so base url applies - sub_filter 'src="/' 'src="'; # remove absolute URL path so base url applies - - sub_filter '/{{num}}/jobs/' '/jobs/'; - - sub_filter "setUIRoot('')" "setUIRoot('/spark-applications-4/{spark-application-resource-name}/')"; # Set UI root for JS scripts - sub_filter "document.baseURI.split" "document.documentURI.split"; # Executors page issue fix - sub_filter_once off; - ingressTLS: - - hosts: - - localhost - secretName: localhost-tls-secret - driver: - env: - - name: STATSD_SERVER_IP - valueFrom: - fieldRef: - fieldPath: status.hostIP - - name: STATSD_SERVER_PORT - value: "8125" - executor: - env: - - name: STATSD_SERVER_IP - valueFrom: - fieldRef: - fieldPath: status.hostIP - - name: STATSD_SERVER_PORT - value: "8125" -sparkImages: - - name: apache/spark-py:v3.2.2 - types: - - Python - version: "3.2" - - name: apache/spark:v3.2.2 - types: - - Java - - Scala - version: "3.2" -s3Bucket: bpg -s3Folder: uploaded -sparkLogS3Bucket: bpg -sparkLogIndex: index/index.txt -batchFileLimit: 2016 -sparkHistoryDns: localhost -gatewayDns: localhost -sparkHistoryUrl: http://localhost:8088 -allowedUsers: - - '*' -blockedUsers: [] -queues: - - name: poc - maxRunningMillis: 21600000 -queueTokenSOPS: {} -dbStorageSOPS: - connectionString: jdbc:postgresql://localhost:5432/bpg?useUnicode=yes&characterEncoding=UTF-8&useLegacyDatetimeCode=false&connectTimeout=10000&socketTimeout=30000 - user: bpg - password: samplepass -statusCacheExpireMillis: 9000 -server: - applicationConnectors: - - type: http - port: 8080 -logging: - level: INFO - loggers: - com.apple.spark: INFO -sops: {} From 80e58f6032cf37fe649a10332d89481457c20a90 Mon Sep 17 00:00:00 2001 From: Claudia Sun Date: Mon, 25 Sep 2023 09:51:33 -0700 Subject: [PATCH 4/5] revert .gitignore --- .gitignore | 2 +- bpg-config.yaml | 118 ++++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 119 insertions(+), 1 deletion(-) create mode 100644 bpg-config.yaml diff --git a/.gitignore b/.gitignore index 1e00ce7..71beb56 100644 --- a/.gitignore +++ b/.gitignore @@ -5,7 +5,7 @@ target/ classes/ dependency-reduced-pom.xml -bpg-config.yaml +config.yml venv/ __pycache__/ diff --git a/bpg-config.yaml b/bpg-config.yaml new file mode 100644 index 0000000..5b56f66 --- /dev/null +++ b/bpg-config.yaml @@ -0,0 +1,118 @@ +defaultSparkConf: + spark.kubernetes.submission.connectionTimeout: 30000 + spark.kubernetes.submission.requestTimeout: 30000 + spark.kubernetes.driver.connectionTimeout: 30000 + spark.kubernetes.driver.requestTimeout: 30000 +sparkClusters: + - weight: 100 + id: minikube + eksCluster: minikube + masterUrl: https://127.0.0.1:63978 + caCertDataSOPS: LS0tLS1CRUdJTiBDRVJUSUZJQ0FURS0tLS0tCk1JSURCakNDQWU2Z0F3SUJBZ0lCQVRBTkJna3Foa2lHOXcwQkFRc0ZBREFWTVJNd0VRWURWUVFERXdwdGFXNXAKYTNWaVpVTkJNQjRYRFRJek1ERXdPREU0TXpFeU9Gb1hEVE16TURFd05qRTRNekV5T0Zvd0ZURVRNQkVHQTFVRQpBeE1LYldsdWFXdDFZbVZEUVRDQ0FTSXdEUVlKS29aSWh2Y05BUUVCQlFBRGdnRVBBRENDQVFvQ2dnRUJBTTNkClA2OE1uaVVVTTRtamE5V0JpNlkvamhZeWxsZURhNmtsUEFFSGxHaGFoMTJ1WTg2WnUrSHhuNFdkU0xSOXpTOG0KbVFJczBnVzdJSnF4RFU0bFZQZDlCK2cwY3JPQWxGQXlaT1NOdkkxSWFUVFdQR1dkZ3hyWGZzZ051ek9wYXpsTQovU1MxWUpPQ09KbkduZFNxR1BOTSs2RTIvOTFWSWVDcHUyS3lHd0ZIZDlpOGZqcTVGVG9rdEs1ME01aWtHaHJ1CjE0QXhFRGNLbmtwcENoZDlIM2piWUVnWGk4V2w5c0czdGlmalF4b0xmODVvazJ1UDJYV2dZeWdwYlRoVUFvY28KM3pHL3I1NUYrQ1JDcWdFcjk5RVFPKzljRGRxcFRtOEZSQjBUSVFMOGJ5UXRqOHFzOUpvRkcwaXczT3ZlKzZCNQpDb3dEQ1lvazZqdUFJL1RoY2dFQ0F3RUFBYU5oTUY4d0RnWURWUjBQQVFIL0JBUURBZ0trTUIwR0ExVWRKUVFXCk1CUUdDQ3NHQVFVRkJ3TUNCZ2dyQmdFRkJRY0RBVEFQQmdOVkhSTUJBZjhFQlRBREFRSC9NQjBHQTFVZERnUVcKQkJSSEpxd01Cc0laK2ZTbC9wYS90bzk0S3ZNVExEQU5CZ2txaGtpRzl3MEJBUXNGQUFPQ0FRRUFxU1FxR2V4SApOV3ZFeC9hemR3TDhtNFF3ayt2UzYxMkwvN1VRbmNKV0txbGhTQUorOTRxSFNOZlRlL3VHblJPTXNsRlJFRG9OCmZ4ZXFmRXM4N2c2bDFHQ045SnBXaWZUbmYwU3pQMjE4bGIzdjdWN2F5Um5ncit2V2hMSXNWdzVVRm4rWXNhdmUKZjFUd3k2NmM1Uk5SOFdhREFhL3hLR1BTUkJ2alkxeEVCTmVxOXpPTmlJVDFuN0IvVjhweG84U2k1QkFKbWVhNgp3ZTN6Q3pUalJ0UWN3ZVdEeGJjNG9tL1RmOWE0eVExYTJqMFFwV0J2R1ArQldsc1E5NThodnhvQnBPV1NhM2ExCkErVVdVelBiWUtpN3BkYW1nWFJIU01jV3MwQ3ptOVRiaUF2L2tSNk54VFEwZDU1YzZiTWlPN2xPS251Wmkyd2MKZTdyM1dUNVh5TkNKZUE9PQotLS0tLUVORCBDRVJUSUZJQ0FURS0tLS0tCg== + userTokenSOPS: ZXlKaGJHY2lPaUpTVXpJMU5pSXNJbXRwWkNJNklqaGFNRFZFYmsxNFkzZ3pMV1ppVm5WZmNWbE1NVmx5YnpSM1ExUXlUVGs1UmpGTE1VeDFhMnRyWkRnaWZRLmV5SnBjM01pT2lKcmRXSmxjbTVsZEdWekwzTmxjblpwWTJWaFkyTnZkVzUwSWl3aWEzVmlaWEp1WlhSbGN5NXBieTl6WlhKMmFXTmxZV05qYjNWdWRDOXVZVzFsYzNCaFkyVWlPaUp6Y0dGeWF5MWhjSEJzYVdOaGRHbHZibk1pTENKcmRXSmxjbTVsZEdWekxtbHZMM05sY25acFkyVmhZMk52ZFc1MEwzTmxZM0psZEM1dVlXMWxJam9pYkc5allXd3RjM0JoY21zdGIzQmxjbUYwYjNJdGMzQmhjbXN0ZEc5clpXNHRjbXRpWW13aUxDSnJkV0psY201bGRHVnpMbWx2TDNObGNuWnBZMlZoWTJOdmRXNTBMM05sY25acFkyVXRZV05qYjNWdWRDNXVZVzFsSWpvaWJHOWpZV3d0YzNCaGNtc3RiM0JsY21GMGIzSXRjM0JoY21zaUxDSnJkV0psY201bGRHVnpMbWx2TDNObGNuWnBZMlZoWTJOdmRXNTBMM05sY25acFkyVXRZV05qYjNWdWRDNTFhV1FpT2lJeU16YzVPR0poWmkwMU1UazJMVFF4WVRZdE9ERm1ZaTAwWkdFd1lXVTVaR0prT1dVaUxDSnpkV0lpT2lKemVYTjBaVzA2YzJWeWRtbGpaV0ZqWTI5MWJuUTZjM0JoY21zdFlYQndiR2xqWVhScGIyNXpPbXh2WTJGc0xYTndZWEpyTFc5d1pYSmhkRzl5TFhOd1lYSnJJbjAubjlyVDdnZTJFanQ0YUNSNkVlU21meWFMY2lOWGFsZEdJQk0wVHMyaXJ6alZzT1R4bV9TSnVsNWFKakt5Z1E2MG5QR0szMW16aGVYVHRielFLWk9qU2dkYldkMHVQWWlLcW4taktyQk9JVkdHOHdFV002dlZkTmg3MnFVa0h5M2FxVkVWTzdPd2xrUTlHWVFIeWNHMlFFVUlRdTJuYlJHN3Z5SVc0SHFtdjhvQ2hKYWZQd1VzaUg5S1lsZTliRGJnOFl0S3Q3d2JZYTExbVFBdkU3NS1uS1NxNFBZU1kyUWlZZ21VWk85SFhPRWNIVDFhZjNkdGxiclFBdDZXOFYyQUZmbG5KeFJyTVZINzJLTnZjdlE4N1NBU3ZaMTI2VWpNbHFkNDI1R3VJQUM4aGJIV1g5em5iMF80b3VDLS1pMmNtUUt1dWpzYnNEaWNudkVtMzFPcWVR + userName: spark-operator + sparkApplicationNamespace: spark-applications + sparkServiceAccount: local-spark-operator-spark + sparkVersions: + - "3.2" + queues: + - poc + ttlSeconds: 259200 + sparkUIUrl: http://localhost:8080 + batchScheduler: yunikorn + sparkConf: + spark.kubernetes.executor.podNamePrefix: '{spark-application-resource-name}' + spark.eventLog.enabled: "true" + spark.eventLog.dir: s3a://bpg/eventlog + spark.history.fs.logDirectory: s3a://bpg/eventlog + spark.sql.warehouse.dir: s3a://bpg/warehouse + spark.sql.catalogImplementation: hive + spark.jars.ivy: /opt/spark/work-dir/.ivy2 + spark.hadoop.fs.s3a.connection.ssl.enabled: false + spark.hadoop.fs.s3a.access.key: claudia_sun + spark.hadoop.fs.s3a.secret.key: 1cf9542a989eb37304f3e43d5926777d99b3b4c01a7d9939cef7c75a311b9310 + spark.hadoop.fs.s3a.endpoint: 10.107.31.36:9878 + spark.hadoop.fs.s3a.impl: org.apache.hadoop.fs.s3a.S3AFileSystem + spark.hadoop.fs.s3a.change.detection.version.required: false + spark.hadoop.fs.s3a.change.detection.mode: none + spark.hadoop.fs.s3a.fast.upload: true + spark.jars.packages: org.apache.hadoop:hadoop-aws:3.2.2 + spark.hadoop.fs.s3a.aws.credentials.provider: org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider + sparkUIOptions: + ServicePort: 4040 + ingressAnnotations: + nginx.ingress.kubernetes.io/rewrite-target: /$2 + nginx.ingress.kubernetes.io/proxy-redirect-from: http://$host/ + nginx.ingress.kubernetes.io/proxy-redirect-to: /spark-applications-4/{spark-application-resource-name}/ + kubernetes.io/ingress.class: nginx + nginx.ingress.kubernetes.io/configuration-snippet: | + proxy_set_header Accept-Encoding ""; # disable compression + sub_filter_last_modified off; + sub_filter '' ' '; # add base url + sub_filter 'href="/' 'href="'; # remove absolute URL path so base url applies + sub_filter 'src="/' 'src="'; # remove absolute URL path so base url applies + + sub_filter '/{{num}}/jobs/' '/jobs/'; + + sub_filter "setUIRoot('')" "setUIRoot('/spark-applications-4/{spark-application-resource-name}/')"; # Set UI root for JS scripts + sub_filter "document.baseURI.split" "document.documentURI.split"; # Executors page issue fix + sub_filter_once off; + ingressTLS: + - hosts: + - localhost + secretName: localhost-tls-secret + driver: + env: + - name: STATSD_SERVER_IP + valueFrom: + fieldRef: + fieldPath: status.hostIP + - name: STATSD_SERVER_PORT + value: "8125" + executor: + env: + - name: STATSD_SERVER_IP + valueFrom: + fieldRef: + fieldPath: status.hostIP + - name: STATSD_SERVER_PORT + value: "8125" +sparkImages: + - name: apache/spark-py:v3.2.2 + types: + - Python + version: "3.2" + - name: apache/spark:v3.2.2 + types: + - Java + - Scala + version: "3.2" +s3Bucket: bpg +s3Folder: uploaded +sparkLogS3Bucket: bpg +sparkLogIndex: index/index.txt +batchFileLimit: 2016 +sparkHistoryDns: localhost +gatewayDns: localhost +sparkHistoryUrl: http://localhost:8088 +allowedUsers: + - '*' +blockedUsers: [] +queues: + - name: poc + maxRunningMillis: 21600000 +queueTokenSOPS: {} +dbStorageSOPS: + connectionString: jdbc:postgresql://localhost:5432/bpg?useUnicode=yes&characterEncoding=UTF-8&useLegacyDatetimeCode=false&connectTimeout=10000&socketTimeout=30000 + user: bpg + password: samplepass +statusCacheExpireMillis: 9000 +server: + applicationConnectors: + - type: http + port: 8080 +logging: + level: INFO + loggers: + com.apple.spark: INFO +sops: {} From 6df5b5b57d76de1ac255ec628c10d02ab9d99071 Mon Sep 17 00:00:00 2001 From: Claudia Sun Date: Mon, 25 Sep 2023 09:52:51 -0700 Subject: [PATCH 5/5] delete bpg-config.yaml --- bpg-config.yaml | 118 ------------------------------------------------ 1 file changed, 118 deletions(-) delete mode 100644 bpg-config.yaml diff --git a/bpg-config.yaml b/bpg-config.yaml deleted file mode 100644 index 5b56f66..0000000 --- a/bpg-config.yaml +++ /dev/null @@ -1,118 +0,0 @@ -defaultSparkConf: - spark.kubernetes.submission.connectionTimeout: 30000 - spark.kubernetes.submission.requestTimeout: 30000 - spark.kubernetes.driver.connectionTimeout: 30000 - spark.kubernetes.driver.requestTimeout: 30000 -sparkClusters: - - weight: 100 - id: minikube - eksCluster: minikube - masterUrl: https://127.0.0.1:63978 - caCertDataSOPS: LS0tLS1CRUdJTiBDRVJUSUZJQ0FURS0tLS0tCk1JSURCakNDQWU2Z0F3SUJBZ0lCQVRBTkJna3Foa2lHOXcwQkFRc0ZBREFWTVJNd0VRWURWUVFERXdwdGFXNXAKYTNWaVpVTkJNQjRYRFRJek1ERXdPREU0TXpFeU9Gb1hEVE16TURFd05qRTRNekV5T0Zvd0ZURVRNQkVHQTFVRQpBeE1LYldsdWFXdDFZbVZEUVRDQ0FTSXdEUVlKS29aSWh2Y05BUUVCQlFBRGdnRVBBRENDQVFvQ2dnRUJBTTNkClA2OE1uaVVVTTRtamE5V0JpNlkvamhZeWxsZURhNmtsUEFFSGxHaGFoMTJ1WTg2WnUrSHhuNFdkU0xSOXpTOG0KbVFJczBnVzdJSnF4RFU0bFZQZDlCK2cwY3JPQWxGQXlaT1NOdkkxSWFUVFdQR1dkZ3hyWGZzZ051ek9wYXpsTQovU1MxWUpPQ09KbkduZFNxR1BOTSs2RTIvOTFWSWVDcHUyS3lHd0ZIZDlpOGZqcTVGVG9rdEs1ME01aWtHaHJ1CjE0QXhFRGNLbmtwcENoZDlIM2piWUVnWGk4V2w5c0czdGlmalF4b0xmODVvazJ1UDJYV2dZeWdwYlRoVUFvY28KM3pHL3I1NUYrQ1JDcWdFcjk5RVFPKzljRGRxcFRtOEZSQjBUSVFMOGJ5UXRqOHFzOUpvRkcwaXczT3ZlKzZCNQpDb3dEQ1lvazZqdUFJL1RoY2dFQ0F3RUFBYU5oTUY4d0RnWURWUjBQQVFIL0JBUURBZ0trTUIwR0ExVWRKUVFXCk1CUUdDQ3NHQVFVRkJ3TUNCZ2dyQmdFRkJRY0RBVEFQQmdOVkhSTUJBZjhFQlRBREFRSC9NQjBHQTFVZERnUVcKQkJSSEpxd01Cc0laK2ZTbC9wYS90bzk0S3ZNVExEQU5CZ2txaGtpRzl3MEJBUXNGQUFPQ0FRRUFxU1FxR2V4SApOV3ZFeC9hemR3TDhtNFF3ayt2UzYxMkwvN1VRbmNKV0txbGhTQUorOTRxSFNOZlRlL3VHblJPTXNsRlJFRG9OCmZ4ZXFmRXM4N2c2bDFHQ045SnBXaWZUbmYwU3pQMjE4bGIzdjdWN2F5Um5ncit2V2hMSXNWdzVVRm4rWXNhdmUKZjFUd3k2NmM1Uk5SOFdhREFhL3hLR1BTUkJ2alkxeEVCTmVxOXpPTmlJVDFuN0IvVjhweG84U2k1QkFKbWVhNgp3ZTN6Q3pUalJ0UWN3ZVdEeGJjNG9tL1RmOWE0eVExYTJqMFFwV0J2R1ArQldsc1E5NThodnhvQnBPV1NhM2ExCkErVVdVelBiWUtpN3BkYW1nWFJIU01jV3MwQ3ptOVRiaUF2L2tSNk54VFEwZDU1YzZiTWlPN2xPS251Wmkyd2MKZTdyM1dUNVh5TkNKZUE9PQotLS0tLUVORCBDRVJUSUZJQ0FURS0tLS0tCg== - userTokenSOPS: ZXlKaGJHY2lPaUpTVXpJMU5pSXNJbXRwWkNJNklqaGFNRFZFYmsxNFkzZ3pMV1ppVm5WZmNWbE1NVmx5YnpSM1ExUXlUVGs1UmpGTE1VeDFhMnRyWkRnaWZRLmV5SnBjM01pT2lKcmRXSmxjbTVsZEdWekwzTmxjblpwWTJWaFkyTnZkVzUwSWl3aWEzVmlaWEp1WlhSbGN5NXBieTl6WlhKMmFXTmxZV05qYjNWdWRDOXVZVzFsYzNCaFkyVWlPaUp6Y0dGeWF5MWhjSEJzYVdOaGRHbHZibk1pTENKcmRXSmxjbTVsZEdWekxtbHZMM05sY25acFkyVmhZMk52ZFc1MEwzTmxZM0psZEM1dVlXMWxJam9pYkc5allXd3RjM0JoY21zdGIzQmxjbUYwYjNJdGMzQmhjbXN0ZEc5clpXNHRjbXRpWW13aUxDSnJkV0psY201bGRHVnpMbWx2TDNObGNuWnBZMlZoWTJOdmRXNTBMM05sY25acFkyVXRZV05qYjNWdWRDNXVZVzFsSWpvaWJHOWpZV3d0YzNCaGNtc3RiM0JsY21GMGIzSXRjM0JoY21zaUxDSnJkV0psY201bGRHVnpMbWx2TDNObGNuWnBZMlZoWTJOdmRXNTBMM05sY25acFkyVXRZV05qYjNWdWRDNTFhV1FpT2lJeU16YzVPR0poWmkwMU1UazJMVFF4WVRZdE9ERm1ZaTAwWkdFd1lXVTVaR0prT1dVaUxDSnpkV0lpT2lKemVYTjBaVzA2YzJWeWRtbGpaV0ZqWTI5MWJuUTZjM0JoY21zdFlYQndiR2xqWVhScGIyNXpPbXh2WTJGc0xYTndZWEpyTFc5d1pYSmhkRzl5TFhOd1lYSnJJbjAubjlyVDdnZTJFanQ0YUNSNkVlU21meWFMY2lOWGFsZEdJQk0wVHMyaXJ6alZzT1R4bV9TSnVsNWFKakt5Z1E2MG5QR0szMW16aGVYVHRielFLWk9qU2dkYldkMHVQWWlLcW4taktyQk9JVkdHOHdFV002dlZkTmg3MnFVa0h5M2FxVkVWTzdPd2xrUTlHWVFIeWNHMlFFVUlRdTJuYlJHN3Z5SVc0SHFtdjhvQ2hKYWZQd1VzaUg5S1lsZTliRGJnOFl0S3Q3d2JZYTExbVFBdkU3NS1uS1NxNFBZU1kyUWlZZ21VWk85SFhPRWNIVDFhZjNkdGxiclFBdDZXOFYyQUZmbG5KeFJyTVZINzJLTnZjdlE4N1NBU3ZaMTI2VWpNbHFkNDI1R3VJQUM4aGJIV1g5em5iMF80b3VDLS1pMmNtUUt1dWpzYnNEaWNudkVtMzFPcWVR - userName: spark-operator - sparkApplicationNamespace: spark-applications - sparkServiceAccount: local-spark-operator-spark - sparkVersions: - - "3.2" - queues: - - poc - ttlSeconds: 259200 - sparkUIUrl: http://localhost:8080 - batchScheduler: yunikorn - sparkConf: - spark.kubernetes.executor.podNamePrefix: '{spark-application-resource-name}' - spark.eventLog.enabled: "true" - spark.eventLog.dir: s3a://bpg/eventlog - spark.history.fs.logDirectory: s3a://bpg/eventlog - spark.sql.warehouse.dir: s3a://bpg/warehouse - spark.sql.catalogImplementation: hive - spark.jars.ivy: /opt/spark/work-dir/.ivy2 - spark.hadoop.fs.s3a.connection.ssl.enabled: false - spark.hadoop.fs.s3a.access.key: claudia_sun - spark.hadoop.fs.s3a.secret.key: 1cf9542a989eb37304f3e43d5926777d99b3b4c01a7d9939cef7c75a311b9310 - spark.hadoop.fs.s3a.endpoint: 10.107.31.36:9878 - spark.hadoop.fs.s3a.impl: org.apache.hadoop.fs.s3a.S3AFileSystem - spark.hadoop.fs.s3a.change.detection.version.required: false - spark.hadoop.fs.s3a.change.detection.mode: none - spark.hadoop.fs.s3a.fast.upload: true - spark.jars.packages: org.apache.hadoop:hadoop-aws:3.2.2 - spark.hadoop.fs.s3a.aws.credentials.provider: org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider - sparkUIOptions: - ServicePort: 4040 - ingressAnnotations: - nginx.ingress.kubernetes.io/rewrite-target: /$2 - nginx.ingress.kubernetes.io/proxy-redirect-from: http://$host/ - nginx.ingress.kubernetes.io/proxy-redirect-to: /spark-applications-4/{spark-application-resource-name}/ - kubernetes.io/ingress.class: nginx - nginx.ingress.kubernetes.io/configuration-snippet: | - proxy_set_header Accept-Encoding ""; # disable compression - sub_filter_last_modified off; - sub_filter '' ' '; # add base url - sub_filter 'href="/' 'href="'; # remove absolute URL path so base url applies - sub_filter 'src="/' 'src="'; # remove absolute URL path so base url applies - - sub_filter '/{{num}}/jobs/' '/jobs/'; - - sub_filter "setUIRoot('')" "setUIRoot('/spark-applications-4/{spark-application-resource-name}/')"; # Set UI root for JS scripts - sub_filter "document.baseURI.split" "document.documentURI.split"; # Executors page issue fix - sub_filter_once off; - ingressTLS: - - hosts: - - localhost - secretName: localhost-tls-secret - driver: - env: - - name: STATSD_SERVER_IP - valueFrom: - fieldRef: - fieldPath: status.hostIP - - name: STATSD_SERVER_PORT - value: "8125" - executor: - env: - - name: STATSD_SERVER_IP - valueFrom: - fieldRef: - fieldPath: status.hostIP - - name: STATSD_SERVER_PORT - value: "8125" -sparkImages: - - name: apache/spark-py:v3.2.2 - types: - - Python - version: "3.2" - - name: apache/spark:v3.2.2 - types: - - Java - - Scala - version: "3.2" -s3Bucket: bpg -s3Folder: uploaded -sparkLogS3Bucket: bpg -sparkLogIndex: index/index.txt -batchFileLimit: 2016 -sparkHistoryDns: localhost -gatewayDns: localhost -sparkHistoryUrl: http://localhost:8088 -allowedUsers: - - '*' -blockedUsers: [] -queues: - - name: poc - maxRunningMillis: 21600000 -queueTokenSOPS: {} -dbStorageSOPS: - connectionString: jdbc:postgresql://localhost:5432/bpg?useUnicode=yes&characterEncoding=UTF-8&useLegacyDatetimeCode=false&connectTimeout=10000&socketTimeout=30000 - user: bpg - password: samplepass -statusCacheExpireMillis: 9000 -server: - applicationConnectors: - - type: http - port: 8080 -logging: - level: INFO - loggers: - com.apple.spark: INFO -sops: {}