Skip to content

Commit

Permalink
Merge pull request #47 from Mu-Sigma/develop
Browse files Browse the repository at this point in the history
Corrections to vignettes
  • Loading branch information
naren1991 authored Jan 3, 2019
2 parents d53b6ad + a09e945 commit ab5cfd2
Show file tree
Hide file tree
Showing 2 changed files with 8 additions and 8 deletions.
2 changes: 1 addition & 1 deletion vignettes/Interoperable_Pipelines.Rmd
Original file line number Diff line number Diff line change
Expand Up @@ -231,7 +231,7 @@ opWithFilter %>>% getOutputById(2)

Finally, we show a case, where sequential filtering steps are performed in Spark, before visualizing in R, and running a decision tree model in Python.

Note, that in this case, we register `getTargetForPyClassifcation` and `getTargetForPyClassification` as *non-data* functions. In this particular pipeline, there is no main *path* as such, as the pipeline branches into 2 paths - one in R and the other in Python. In such cases, using `outAsIn` or the `dataFunction` parameter with formula semantics is just a **question of convenience**. If the first argument of a *non-data* function is of a data frame class in R, Python (Pandas) or Spark, the package automatically performs type conversions when environments are switched (R -> Spark, Spark -> Python, and so on).
Note, that in this case, `getTargetForPyClassifcation` and `getTargetForPyClassification` have been registered as *data* functions. Type conversions between R, Spark and Python for data functions are performed automatically by the package.

```{r}
pipelineObj %>>% filterData_spark(condition = "Species == 'setosa' or Species == 'virginica'") %>>%
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,9 +51,9 @@ knitr::opts_chunk$set(
library(analysisPipelines)
library(SparkR)
## Define these variables as per the configuration of your machine. This is just an example.
## Define these variables as per the configuration of your machine. The below example is just illustrative.
sparkHome <- "/Users/naren/softwares/spark-2.3.1-bin-hadoop2.7/"
sparkHome <- "/path/to/spark/directory/"
sparkMaster <- "local[1]"
sparkPackages <- c("org.apache.spark:spark-sql-kafka-0-10_2.11:2.3.1")
# Set spark home variable if not present
Expand Down Expand Up @@ -81,10 +81,10 @@ This example illustrates usage of pipelines for a streaming application. In this
Read streaming data from Kafka.

```{r}
## Define these variables as per the configuration of your machine. This is just an example.
## Define these variables as per the configuration of your machine. The below example is just illustrative.
kafkaBootstrapServers <- "172.25.0.144:9092,172.25.0.98:9092,172.25.0.137:9092"
consumerTopic <- "netlogo"
kafkaBootstrapServers <- "192.168.0.256:9092,192.168.0.257:9092,192.168.0.258:9092"
consumerTopic <- "topic1"
streamObj <- read.stream(source = "kafka", kafka.bootstrap.servers = kafkaBootstrapServers, subscribe = consumerTopic, startingOffsets="earliest")
printSchema(streamObj)
```
Expand All @@ -95,7 +95,7 @@ Users can define their own functions and use it as a part of the pipeline. These

```{r}
# Function to convert datatype json struct to colums
# Function to convert datatype json struct to columns
convertStructToDf <- function(streamObj) {
streamObj <- SparkR::select(streamObj,list(getField(streamObj$`jsontostructs(value)`,"bannerId"),
getField(streamObj$`jsontostructs(value)`,"mobile"),
Expand Down Expand Up @@ -131,7 +131,7 @@ castDfColumns <- function(streamObj) {
return (streamObj)
}
# Function to convert datatype json struct to colums
# Function to convert datatype json struct to columns
convertDfToKafkaKeyValuePairs <- function (streamObj, kafkaKey) {
streamObj <- SparkR::toJSON(streamObj)
streamObj$key <- kafkaKey
Expand Down

0 comments on commit ab5cfd2

Please sign in to comment.