Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FSTORE-1008] add java client to hsfs #1413

Open
wants to merge 18 commits into
base: branch-3.9
Choose a base branch
from
6 changes: 5 additions & 1 deletion java/beam/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
<parent>
<artifactId>hsfs-parent</artifactId>
<groupId>com.logicalclocks</groupId>
<version>3.9.0-RC8</version>
<version>3.9.0-RC9</version>
</parent>
<modelVersion>4.0.0</modelVersion>

Expand All @@ -27,6 +27,10 @@
<groupId>javax.xml.bind</groupId>
<artifactId>jaxb-api</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.kafka</groupId>
<artifactId>*</artifactId>
</exclusion>
</exclusions>
</dependency>

Expand Down
191 changes: 2 additions & 189 deletions java/beam/src/main/java/com/logicalclocks/hsfs/beam/FeatureStore.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,65 +19,23 @@

import com.logicalclocks.hsfs.FeatureStoreBase;
import com.logicalclocks.hsfs.FeatureStoreException;
import com.logicalclocks.hsfs.StatisticsConfig;
import com.logicalclocks.hsfs.StorageConnector;
import com.logicalclocks.hsfs.TimeTravelFormat;
import com.logicalclocks.hsfs.TrainingDatasetBase;
import com.logicalclocks.hsfs.beam.constructor.Query;
import com.logicalclocks.hsfs.beam.engine.FeatureGroupEngine;
import com.logicalclocks.hsfs.beam.engine.FeatureViewEngine;
import com.logicalclocks.hsfs.metadata.StorageConnectorApi;
import lombok.NonNull;

import java.io.IOException;
import java.util.List;

public class FeatureStore extends FeatureStoreBase<Query> {

private FeatureGroupEngine featureGroupEngine;
private FeatureViewEngine featureViewEngine;

public FeatureStore() {
storageConnectorApi = new StorageConnectorApi();
featureViewEngine = new FeatureViewEngine();
featureGroupEngine = new FeatureGroupEngine();
}

@Override
public Object createFeatureGroup() {
throw new UnsupportedOperationException("Not supported for Beam");
}

@Override
public Object getFeatureGroups(@NonNull String name) throws FeatureStoreException, IOException {
throw new UnsupportedOperationException("Not supported for Beam");
}

@Override
public Object getOrCreateFeatureGroup(String name, Integer version) throws IOException, FeatureStoreException {
throw new UnsupportedOperationException("Not supported for Beam");
}


@Override
public Object getOrCreateFeatureGroup(String name, Integer version, List<String> primaryKeys,
boolean onlineEnabled, String eventTime) throws IOException, FeatureStoreException {
throw new UnsupportedOperationException("Not supported for Beam");
}

@Override
public Object getOrCreateFeatureGroup(String name, Integer version, List<String> primaryKeys,
List<String> partitionKeys, boolean onlineEnabled, String eventTime) throws IOException, FeatureStoreException {
throw new UnsupportedOperationException("Not supported for Beam");
}

@Override
public Object getOrCreateFeatureGroup(String name, Integer version, String description, List<String> primaryKeys,
List<String> partitionKeys, String hudiPrecombineKey, boolean onlineEnabled, TimeTravelFormat timeTravelFormat,
StatisticsConfig statisticsConfig, String topicName, String notificationTopicName, String eventTime)
throws IOException, FeatureStoreException {
throw new UnsupportedOperationException("Not supported for Beam");
}

/**
* Get a stream feature group object from the feature store.
*
Expand Down Expand Up @@ -132,47 +90,6 @@ public StreamFeatureGroup getStreamFeatureGroup(@NonNull String name, @NonNull I
return featureGroupEngine.getStreamFeatureGroup(this, name, version);
}

@Override
public Object createStreamFeatureGroup() {
throw new UnsupportedOperationException("Not supported for Beam");
}

@Override
public StreamFeatureGroup getOrCreateStreamFeatureGroup(String name, Integer version)
throws IOException, FeatureStoreException {
throw new UnsupportedOperationException("Not supported for Beam");
}

@Override
public StreamFeatureGroup getOrCreateStreamFeatureGroup(String name, Integer version, List<String> primaryKeys,
boolean onlineEnabled, String eventTime) throws IOException, FeatureStoreException {
throw new UnsupportedOperationException("Not supported for Beam");
}

@Override
public StreamFeatureGroup getOrCreateStreamFeatureGroup(String name, Integer version, List<String> primaryKeys,
List<String> partitionKeys, boolean onlineEnabled, String eventTime) throws IOException, FeatureStoreException {
throw new UnsupportedOperationException("Not supported for Beam");
}

@Override
public StreamFeatureGroup getOrCreateStreamFeatureGroup(String name, Integer version, String description,
List<String> primaryKeys, List<String> partitionKeys, String hudiPrecombineKey, boolean onlineEnabled,
StatisticsConfig statisticsConfig, String eventTime, TimeTravelFormat timeTravelFormat)
throws IOException, FeatureStoreException {
throw new UnsupportedOperationException("Not supported for Beam");
}

@Override
public Object createExternalFeatureGroup() {
throw new UnsupportedOperationException("Not supported for Beam");
}

@Override
public Object createFeatureView() {
throw new UnsupportedOperationException("Not supported for Beam");
}

/**
* Get a feature view object from the selected feature store.
*
Expand Down Expand Up @@ -206,7 +123,7 @@ public FeatureView getFeatureView(@NonNull String name, @NonNull Integer version
* }
* </pre>
*
* @param name Name of the feature view.
* @param name Name of the feature view.
* @return FeatureView The feature view metadata object.
* @throws FeatureStoreException If unable to retrieve FeatureView from the feature store.
* @throws IOException Generic IO exception.
Expand All @@ -216,108 +133,4 @@ public FeatureView getFeatureView(String name) throws FeatureStoreException, IOE
+ DEFAULT_VERSION + "`.");
return getFeatureView(name, DEFAULT_VERSION);
}

@Override
public Object getOrCreateFeatureView(String name, Query query, Integer version)
throws FeatureStoreException, IOException {
throw new UnsupportedOperationException("Not supported for Beam");
}

@Override
public Object getOrCreateFeatureView(String name, Query query, Integer version, String description,
List<String> labels) throws FeatureStoreException, IOException {
throw new UnsupportedOperationException("Not supported for Beam");
}

@Override
public Object getExternalFeatureGroup(@NonNull String name, @NonNull Integer version)
throws FeatureStoreException, IOException {
throw new UnsupportedOperationException("Not supported for Beam");
}

@Override
public Object getExternalFeatureGroup(String name) throws FeatureStoreException, IOException {
throw new UnsupportedOperationException("Not supported for Beam");
}

@Override
public StorageConnector getStorageConnector(String name) throws FeatureStoreException, IOException {
throw new UnsupportedOperationException("Not supported for Beam");
}

@Override
public Object getHopsFsConnector(String name) throws FeatureStoreException, IOException {
throw new UnsupportedOperationException("Not supported for Beam");
}

@Override
public Object getExternalFeatureGroups(@NonNull String name) throws FeatureStoreException, IOException {
throw new UnsupportedOperationException("Not supported for Beam");
}

@Override
public Object sql(String name) {
throw new UnsupportedOperationException("Not supported for Beam");
}

@Override
public Object getJdbcConnector(String name) throws FeatureStoreException, IOException {
throw new UnsupportedOperationException("Not supported for Beam");
}

@Override
public Object getS3Connector(String name) throws FeatureStoreException, IOException {
throw new UnsupportedOperationException("Not supported for Beam");
}

@Override
public Object getRedshiftConnector(String name) throws FeatureStoreException, IOException {
throw new UnsupportedOperationException("Not supported for Beam");
}

@Override
public Object getSnowflakeConnector(String name) throws FeatureStoreException, IOException {
throw new UnsupportedOperationException("Not supported for Beam");
}

@Override
public Object getAdlsConnector(String name) throws FeatureStoreException, IOException {
throw new UnsupportedOperationException("Not supported for beam");
}

@Override
public Object getKafkaConnector(String name) throws FeatureStoreException, IOException {
throw new UnsupportedOperationException("Not supported for Beam");
}

@Override
public Object getBigqueryConnector(String name) throws FeatureStoreException, IOException {
throw new UnsupportedOperationException("Not supported for Beam");
}

@Override
public Object getOnlineStorageConnector() throws FeatureStoreException, IOException {
throw new UnsupportedOperationException("Not supported for Beam");
}

@Override
public Object getGcsConnector(String name) throws FeatureStoreException, IOException {
throw new UnsupportedOperationException("Not supported for Beam");
}

@Override
public TrainingDatasetBase getTrainingDataset(@NonNull String name, @NonNull Integer version)
throws FeatureStoreException, IOException {
throw new UnsupportedOperationException("Not supported for Beam");
}

@Override
public TrainingDatasetBase getTrainingDataset(String name) throws FeatureStoreException, IOException {
throw new UnsupportedOperationException("Not supported for Beam");
}

@Override
public Object getTrainingDatasets(@NonNull String name) throws FeatureStoreException, IOException {
throw new UnsupportedOperationException("Not supported for Beam");
}
}
127 changes: 0 additions & 127 deletions java/beam/src/main/java/com/logicalclocks/hsfs/beam/FeatureView.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,136 +17,9 @@

package com.logicalclocks.hsfs.beam;

import com.logicalclocks.hsfs.FeatureStoreException;
import com.logicalclocks.hsfs.FeatureViewBase;
import com.logicalclocks.hsfs.beam.constructor.Query;
import org.apache.beam.sdk.values.PCollection;

import java.io.IOException;
import java.text.ParseException;
import java.util.Map;

public class FeatureView extends FeatureViewBase<FeatureView, FeatureStore, Query, PCollection<Object>> {
@Override
public void addTag(String name, Object value) throws FeatureStoreException, IOException {
throw new UnsupportedOperationException("Not supported for Beam");
}

@Override
public Map<String, Object> getTags() throws FeatureStoreException, IOException {
throw new UnsupportedOperationException("Not supported for Beam");
}

@Override
public Object getTag(String name) throws FeatureStoreException, IOException {
throw new UnsupportedOperationException("Not supported for Beam");
}

@Override
public void deleteTag(String name) throws FeatureStoreException, IOException {
throw new UnsupportedOperationException("Not supported for Beam");
}

@Override
public void addTrainingDatasetTag(Integer version, String name, Object value)
throws FeatureStoreException, IOException {
throw new UnsupportedOperationException("Not supported for Beam");
}

@Override
public Map<String, Object> getTrainingDatasetTags(Integer version) throws FeatureStoreException, IOException {
throw new UnsupportedOperationException("Not supported for Beam");
}

@Override
public Object getTrainingDatasetTag(Integer version, String name) throws FeatureStoreException, IOException {
throw new UnsupportedOperationException("Not supported for Beam");
}

@Override
public void deleteTrainingDatasetTag(Integer version, String name) throws FeatureStoreException, IOException {
throw new UnsupportedOperationException("Not supported for Beam");
}

@Override
public void delete() throws FeatureStoreException, IOException {
throw new UnsupportedOperationException("Not supported for Beam");
}

@Override
public void clean(FeatureStore featureStore, String featureViewName, Integer featureViewVersion)
throws FeatureStoreException, IOException {
throw new UnsupportedOperationException("Not supported for Beam");
}

@Override
public FeatureView update(FeatureView other) throws FeatureStoreException, IOException {
throw new UnsupportedOperationException("Not supported for Beam");
}

@Override
public String getBatchQuery() throws FeatureStoreException, IOException, ParseException {
throw new UnsupportedOperationException("Not supported for Beam");
}

@Override
public String getBatchQuery(String startTime, String endTime)
throws FeatureStoreException, IOException, ParseException {
throw new UnsupportedOperationException("Not supported for Beam");
}

@Override
public PCollection<Object> getBatchData() throws FeatureStoreException, IOException, ParseException {
throw new UnsupportedOperationException("Not supported for Beam");
}

@Override
public PCollection<Object> getBatchData(String startTime, String endTime)
throws FeatureStoreException, IOException, ParseException {
throw new UnsupportedOperationException("Not supported for Beam");
}

@Override
public PCollection<Object> getBatchData(String startTime, String endTime, Map<String, String> readOptions)
throws FeatureStoreException, IOException, ParseException {
throw new UnsupportedOperationException("Not supported for Beam");
}

@Override
public Object getTrainingData(Integer version, Map<String, String> readOptions)
throws IOException, FeatureStoreException, ParseException {
throw new UnsupportedOperationException("Not supported for Beam");
}

@Override
public Object getTrainTestSplit(Integer version, Map<String, String> readOptions)
throws IOException, FeatureStoreException, ParseException {
throw new UnsupportedOperationException("Not supported for Beam");
}

@Override
public Object getTrainValidationTestSplit(Integer version, Map<String, String> readOptions)
throws IOException, FeatureStoreException, ParseException {
throw new UnsupportedOperationException("Not supported for Beam");
}

@Override
public void purgeTrainingData(Integer version) throws FeatureStoreException, IOException {
throw new UnsupportedOperationException("Not supported for Beam");
}

@Override
public void purgeAllTrainingData() throws FeatureStoreException, IOException {
throw new UnsupportedOperationException("Not supported for Beam");
}

@Override
public void deleteTrainingDataset(Integer version) throws FeatureStoreException, IOException {
throw new UnsupportedOperationException("Not supported for Beam");
}

@Override
public void deleteAllTrainingDatasets() throws FeatureStoreException, IOException {
throw new UnsupportedOperationException("Not supported for Beam");
}
}
Loading
Loading