Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
35eca1a
Adding Serverless Provisioner
sahusanket Mar 17, 2025
f5f46ce
ClassLoading issue for DataprocRuntimeEnvironment
sahusanket Mar 18, 2025
7d4ca87
ClassLoading issue for DataprocRuntimeEnvironment
sahusanket Mar 19, 2025
56d7376
Issue : When running with master 'yarn' either HADOOP_CONF_DIR or YA…
sahusanket Mar 19, 2025
ffcf4bf
Issue : URI has a fragment component
sahusanket Mar 20, 2025
30d84d3
Issue : ElementTrackingStore cannot be cast to class org.apache.spa…
sahusanket Mar 24, 2025
244c35e
Issue : jar exists and does not match contents of spark
sahusanket Mar 25, 2025
893a8bf
Issue : DataprocMetricsListener is not a subclass of org.apache.spa…
sahusanket Mar 25, 2025
7df70f7
Issue : NoSuchFileException HydratorSpark.config
sahusanket Mar 25, 2025
4c59dce
Clean up1 : Pom dependency mistake
sahusanket Mar 26, 2025
e35ab07
Adding ServerlessDataprocSubmitter
sahusanket Mar 26, 2025
e938bb1
Adding network field im json
sahusanket Mar 27, 2025
1584323
Removing prefix for Getting Job Details
sahusanket Mar 27, 2025
be02040
Adding support for Stopping or Killing job which is CANCEL for batches.
sahusanket May 8, 2025
638a6e2
Kill serverless job is running in Deprovision state
sahusanket May 8, 2025
cce5976
Testing skip delete of file.
sahusanket May 9, 2025
e086ade
Reverting kill job on deprovision
sahusanket May 9, 2025
0662e2b
Removing GET ...makes it sync and wait
sahusanket May 14, 2025
2bc38cf
Removing master add in spark submit
sahusanket May 14, 2025
a87bedb
fixing LOG_DIR
sahusanket May 14, 2025
b29f4e9
Adding verbose option in java driver n executor
sahusanket May 15, 2025
819f2e0
trying adding Worker entry point for rewrite and intercept
sahusanket May 15, 2025
b6d2a42
Revert "trying adding Worker entry point for rewrite and intercept"
sahusanket May 15, 2025
2663ec9
Addings logs to debugging archive artifact
sahusanket May 15, 2025
85716d1
Manually renaming artifacts archive
sahusanket May 15, 2025
1d6ffbc
Manually adding --jars with artifacts
sahusanket May 16, 2025
204bc73
adding user arg
sahusanket May 16, 2025
2bba481
adding all jars in --jars
sahusanket May 19, 2025
5ffc18a
commenting arhchives to test
sahusanket May 19, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@
import io.cdap.cdap.etl.spark.function.JoinOnFunction;
import io.cdap.cdap.etl.spark.function.PluginFunctionContext;
import io.cdap.cdap.internal.io.SchemaTypeAdapter;
import org.apache.spark.SparkFiles;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
Expand All @@ -74,6 +75,7 @@
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
Expand Down Expand Up @@ -203,7 +205,8 @@ public void run(DatasetContext context) throws Exception {
BatchPhaseSpec phaseSpec = GSON.fromJson(sec.getSpecification().getProperty(Constants.PIPELINEID),
BatchPhaseSpec.class);

Path configFile = sec.getLocalizationContext().getLocalFile("HydratorSpark.config").toPath();
//Issue : NoSuchFileException HydratorSpark.config
Path configFile = Paths.get(SparkFiles.get("HydratorSpark.config"));
try (BufferedReader reader = Files.newBufferedReader(configFile, StandardCharsets.UTF_8)) {
String object = reader.readLine();
SparkBatchSourceSinkFactoryInfo sourceSinkInfo = GSON.fromJson(object, SparkBatchSourceSinkFactoryInfo.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,8 +82,9 @@ public List<Finisher> prepare(PhaseSpec phaseSpec)
throws TransactionFailureException, InstantiationException, IOException {
stageOperations = new HashMap<>();
stagePartitions = new HashMap<>();

File configFile = File.createTempFile("HydratorSpark", ".config");
// Issue : NoSuchFileException HydratorSpark.config
// File configFile = File.createTempFile("HydratorSpark", ".config");
File configFile = new File("/tmp/HydratorSpark.config");
if (!configFile.getParentFile().exists()) {
configFile.getParentFile().mkdirs();
}
Expand Down
132 changes: 1 addition & 131 deletions cdap-common/src/main/resources/cdap-default.xml
Original file line number Diff line number Diff line change
Expand Up @@ -1280,7 +1280,7 @@

<property>
<name>data.storage.properties.gcp-spanner.compression.config</name>
<value>application_specs:application_data:SNAPPY,provisioner_data:provisioner_task_info:SNAPPY,destination_fields_table:destination_data:SNAPPY,summary_fields_table:destination_data:SNAPPY,run_records:run_record_data:SNAPPY,workflows:workflow_data:SNAPPY,operations_table:operations:SNAPPY</value>
<value>application_specs:application_data:SNAPPY,provisioner_data:provisioner_task_info:SNAPPY,destination_fields_table:destination_data:SNAPPY,summary_fields_table:destination_data:SNAPPY,run_records:run_record_data:SNAPPY,workflows:workflow_data:SNAPPY</value>
<description>
Represents table_name:column_name:compressor_type which possibly can cross spanner cell limit
and will need compression to reduce size.
Expand Down Expand Up @@ -2845,11 +2845,6 @@
<value>/opt/cdap/master/ext/log-publisher</value>
</property>

<property>
<name>metadata.storage.extensions.dir</name>
<value>/opt/cdap/master/ext/metadata-storage</value>
</property>

<!-- Metrics Configuration -->

<property>
Expand Down Expand Up @@ -5025,26 +5020,6 @@
</description>
</property>

<property>
<name>security.store.system.properties.gcp-cloudkms.key.rotation.enabled</name>
<value>true</value>
<description>
Enable key rotation for KMS keys used to encrypt and decrypt secure keys.
When true, it will enable rotation for newly created keys as well as
existing keys (after upgrade).
</description>
</property>

<property>
<name>security.store.system.properties.gcp-cloudkms.key.rotation.period.days</name>
<value>90</value>
<description>
Specifies the rotation period applied to KMS keys when
security.store.system.properties.gcp-cloudkms.key.rotation.enabled is true.
The rotation period must be at least 1 day.
</description>
</property>

<property>
<name>security.token.digest.algorithm</name>
<value>HmacSHA256</value>
Expand Down Expand Up @@ -5610,95 +5585,6 @@
</description>
</property>

<property>
<name>task.worker.probe.enabled</name>
<value>false</value>
<description>
Whether to configure a probe for the task worker.
</description>
</property>

<property>
<name>task.worker.probe.k8s.names</name>
<value>liveness</value>
<description>
A Comma separated value of kubernetes probe names. Ex : "liveness","readiness"
</description>
</property>

<property>
<name>task.worker.probe.k8s.liveness.type</name>
<value>httpget</value>
<description>
Which type of liveness probe to be used.
</description>
</property>

<property>
<name>task.worker.probe.k8s.liveness.http.port</name>
<value>${task.worker.bind.port}</value>
<description>
The port number on the container where the API server is listening.
</description>
</property>

<property>
<name>task.worker.probe.k8s.liveness.http.path</name>
<value>/ping</value>
<description>
The specific URL path to access on the container.
</description>
</property>

<property>
<name>task.worker.probe.k8s.liveness.init.delay.seconds</name>
<value>20</value>
<description>
Seconds to wait after container start before the first probe for a task worker.
</description>
</property>

<property>
<name>task.worker.probe.k8s.liveness.timeout.seconds</name>
<value>5</value>
<description>
Seconds after which the probe is considered failed due to no response.
</description>
</property>

<property>
<name>task.worker.probe.k8s.liveness.failure.threshold</name>
<value>12</value>
<description>
Number of consecutive failures before k8s restarts the task worker container.
</description>
</property>

<property>
<name>task.worker.probe.k8s.liveness.period.seconds</name>
<value>10</value>
<description>
The time interval (in seconds) between consecutive probe executions.
</description>
</property>

<property>
<name>task.worker.probe.k8s.liveness.success.threshold</name>
<value>1</value>
<description>
Minimum consecutive successes for the probe to be considered successful after having failed.
Keeping same as K8s default.
</description>
</property>

<property>
<name>task.worker.probe.k8s.liveness.http.scheme</name>
<value>HTTPS</value>
<description>
The protocol used to connect to the host.
</description>
</property>

<!-- System pods Configuration -->
<property>
<name>system.worker.program.twill.controller.start.seconds</name>
Expand Down Expand Up @@ -6243,14 +6129,6 @@
</description>
</property>

<property>
<name>feature.wrangler.workspace.auth.check.enabled</name>
<value>false</value>
<description>
Enables authorization enforcement on workspace actions in Wrangler. If disabled, no checks will be performed.
</description>
</property>

<property>
<name>feature.namespaced.service.accounts.enabled</name>
<value>false</value>
Expand Down Expand Up @@ -6724,12 +6602,4 @@
</description>
</property>

<property>
<name>ui.default.poll.interval.millis</name>
<value>10000</value>
<description>
The default polling interval in milliseconds for poll calls from cdap-ui.
</description>
</property>

</configuration>
Original file line number Diff line number Diff line change
@@ -0,0 +1,180 @@
/*
* Copyright © 2025 Cask Data, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/

package io.cdap.cdap.runtime.spi.provisioner.dataproc;

import com.google.common.base.Strings;
import io.cdap.cdap.api.exception.ErrorCategory;
import io.cdap.cdap.api.exception.ErrorType;
import io.cdap.cdap.runtime.spi.RuntimeMonitorType;
import io.cdap.cdap.runtime.spi.common.DataprocImageVersion;
import io.cdap.cdap.runtime.spi.common.DataprocUtils;
import io.cdap.cdap.runtime.spi.provisioner.Cluster;
import io.cdap.cdap.runtime.spi.provisioner.ClusterStatus;
import io.cdap.cdap.runtime.spi.provisioner.PollingStrategies;
import io.cdap.cdap.runtime.spi.provisioner.PollingStrategy;
import io.cdap.cdap.runtime.spi.provisioner.ProvisionerContext;
import io.cdap.cdap.runtime.spi.provisioner.ProvisionerSpecification;
import io.cdap.cdap.runtime.spi.runtimejob.DataprocClusterInfo;
import io.cdap.cdap.runtime.spi.runtimejob.DataprocRuntimeJobManager;
import io.cdap.cdap.runtime.spi.runtimejob.RuntimeJobDetail;
import io.cdap.cdap.runtime.spi.runtimejob.RuntimeJobManager;
import io.cdap.cdap.runtime.spi.runtimejob.ServerlessDataprocRuntimeJobManager;
import io.cdap.cdap.runtime.spi.ssh.SSHKeyPair;
import io.cdap.cdap.runtime.spi.ssh.SSHPublicKey;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.nio.charset.StandardCharsets;
import java.util.Collections;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.TimeUnit;

/**
* Provisioner to submit a job to Dataproc's Serverless (Dataproc Batch)
*/
public class ServerlessDataprocProvisioner extends AbstractDataprocProvisioner {

private static final Logger LOG = LoggerFactory.getLogger(ServerlessDataprocProvisioner.class);

private static final ProvisionerSpecification SPEC = new ProvisionerSpecification(
"gcp-serverless-dataproc", "Serverless Dataproc",
"Connect and Execute jobs on Serverless Dataproc (Batches).");
// Keys for looking up system properties

private static final String CLUSTER_NAME = "SERVERLESS_DATAPROC";
private static final DataprocClientFactory CLIENT_FACTORY = new DefaultDataprocClientFactory();

public ServerlessDataprocProvisioner() {
super(SPEC);
}

@Override
public void validateProperties(Map<String, String> properties) {
// Creates the DataprocConf for validation
DataprocConf.create(properties);
}

@Override
protected String getClusterName(ProvisionerContext context) {
return context.getProperties().get(CLUSTER_NAME);
}

@Override
public Cluster createCluster(ProvisionerContext context) throws Exception {

// Responsibilities during existing dp cluster :
//TODO 1: Ensure labels are added while submitting a job. from AbstractDataprocProvisioner#getCommonDataprocLabels
//TODO 2: Ensure SparkRuntime Version (image) is compatible while submitting job.
Map<String, String> contextProperties = createContextProperties(context);
DataprocConf conf = DataprocConf.create(contextProperties);

// Return a FAKE CLUSTER for now
return new Cluster(
CLUSTER_NAME,
ClusterStatus.RUNNING,
Collections.emptyList(), Collections.emptyMap());
}

@Override
protected void doDeleteCluster(ProvisionerContext context, Cluster cluster, DataprocConf conf) {
// no-op
}

@Override
public ClusterStatus getClusterStatus(ProvisionerContext context, Cluster cluster) {
ClusterStatus status = cluster.getStatus();
return status == ClusterStatus.DELETING ? ClusterStatus.NOT_EXISTS : status;
}

@Override
public Cluster getClusterDetail(ProvisionerContext context, Cluster cluster) {
return new Cluster(cluster, getClusterStatus(context, cluster));
}

@Override
public PollingStrategy getPollingStrategy(ProvisionerContext context, Cluster cluster) {
if (cluster.getStatus() == ClusterStatus.CREATING) {
return PollingStrategies.fixedInterval(0, TimeUnit.SECONDS);
}
DataprocConf conf = DataprocConf.create(createContextProperties(context));
return PollingStrategies.fixedInterval(conf.getPollInterval(), TimeUnit.SECONDS);
}

/**
* Provides implementation of {@link RuntimeJobManager}.
*/
@Override
public Optional<RuntimeJobManager> getRuntimeJobManager(ProvisionerContext context) {
Map<String, String> properties = createContextProperties(context);
DataprocConf conf = DataprocConf.create(properties);

// if this system property is not provided, we will assume that ssh should be used instead of
// runtime job manager for job launch.
// if (!conf.isRuntimeJobManagerEnabled()) {
// return Optional.empty();
// }
try {
String clusterName = getClusterName(context);
String projectId = conf.getProjectId();
String region = conf.getRegion();
String bucket =
conf.getGcsBucket() != null ? conf.getGcsBucket() : properties.get(DataprocUtils.BUCKET);
return Optional.of(
new ServerlessDataprocRuntimeJobManager(
new DataprocClusterInfo(context, clusterName, conf.getDataprocCredentials(),
getRootUrl(conf), projectId,
region, bucket, getCommonDataprocLabels(context)),
Collections.unmodifiableMap(properties), context.getCDAPVersionInfo(), getImageVersion(conf)));
} catch (Exception e) {
throw new RuntimeException("Error while getting credentials for dataproc. ", e);
}
}
//
// @Override
// public ClusterStatus deleteClusterWithStatus(ProvisionerContext context, Cluster cluster) throws Exception {
// LOG.warn("SANKET here in deleteClusterWithStatus");
// RuntimeJobManager jobManager = getRuntimeJobManager(context).orElse(null);
//
// if (jobManager != null) {
// LOG.warn("SANKET here in deleteClusterWithStatus : jobManager");
// try {
// RuntimeJobDetail jobDetail = jobManager.getDetail(context.getProgramRunInfo()).orElse(null);
// if (jobDetail != null && !jobDetail.getStatus().isTerminated()) {
// LOG.warn("SANKET : trying to cancel for running " );
// jobManager.kill(jobDetail);
// }
// } catch (Exception e) {
// LOG.warn(" Failed to cancel job ");
// return ClusterStatus.RUNNING;
// } finally {
// jobManager.close();
// }
//
// }
// return ClusterStatus.DELETING;
// }

String getImageVersion(DataprocConf conf) {
String imageVersion = conf.getImageVersion();
if (imageVersion == null) {
imageVersion = "1.1";
}
LOG.warn("Going for Serverless version : " + imageVersion);
return imageVersion;
}
}
Loading
Loading