You can find the recording here - CDF-Recording.
For the ease of carrying out the workshop and considering the time at hand, we have already taken care of some of the steps that need to be considered before we can start with the actual workshop steps. The prerequisites that need to be in place are:
(1) Streams Messaging Data Hub Cluster should be created and running.
(2) Stream analytics Data Hub cluster should be created and running.
(3) Data provider should be configured in SQL Stream Builder.
(4) Have access to the file syslog-to-kafka.json
.
(5) Environment should be enabled as part of the CDF Data Service.
Step 0 basically talks about verifying different aspects w.r.t to access and connections before we could begin with the actual steps.
Your instructor will guide you through this.
(1) Credentials: Participants must enter their First Name
, Last Name
& Company
details and make a note of corresponding Workshop Login Username
, Workshop Login Password
and CDP Workload User
to be used in this workshop.
Please use the login url: Workshop login in case you are not logged in.
Enter the Workshop Login Username
and Workshop Login Password
that you obtained as part of Step 0
.
(Note: Note that your Workshop Login Username would be something like wuser00@workshop.com
and not just wuser00
).
You should be able to get the following home page of CDP Public Cloud.
You will need to define your workload password that will be used to access non-SSO interfaces. Please keep a note of this workload password. If you have forgotten it, you will be able to repeat this process and define another one.
You may read more about workload passwords here.
-
Click on your
user name
(Ex:wuser00@workshop.com
) at the lower left corner. -
Click on the
Profile
option.
-
Click option
Set Workload Password
. -
Enter a suitable
Password
andConfirm Password
. -
Click button
Set Workload Password
.
Check that you got the message - Workload password is currently set
or alternatively, look for a message next to Workload Password
which says (Workload password is currently set)
NOTE: THESE STEPS HAVE ALREADY BEEN DONE FOR YOU, THIS SECTION WILL WALK YOU THROUGH HOW PERMISSIONS/POLICIES ARE MANAGED IN RANGER. PLEASE DO NOT EXECUTE THE STEPS IN THIS SECTION OR CHANGE ANYTHING.
Verify if the user group workshop-users
(all users are part of this group) who will be performing the workshop is present in both all-consumergroup
and all-topic
.
In Ranger, select the SCHEMA-REGISTRY
repository that’s associated with the stream messaging datahub.
Scroll up the page here (https://github.com/DashDipti/cdf-workshop) and click on <> Code
and then choose the option Download ZIP
.
Creating a data flow for CDF-PC is the same process as creating any data flow within Nifi with 3 very important steps.
(a) The data flow that would be used for CDF-PC must be self-contained within a process group.
(b) Data flows for CDF-PC must use parameters for any property on a processor that is modifiable, e.g. user names, Kafka topics, etc.
(c) All queues need to have meaningful names (instead of Success, Fail, and Retry). These names will be used to define Key Performance Indicators in CDF-PC.
Configure Parameters: Parameters are reused within the flow multiple times and will also be configurable at the time of deployment.
There are 2 options available: Add Parameter
, which is used for specifying non-sensitive values and Add Sensitive Parameter
, which is used for specifying sensitive parameters like password.
Add the following parameters.
Name
: CDP Workload User
.
Value
: The username assigned to you
. Ex: wuser00
.
Click on Apply
.
Add the following parameters.
Name
: CDP Workload User Password
.
Value
: Workload User password set by you in 'Step 1.1: Define Workload Password'
.
Click on Apply
.
Now that we have created these parameters, we can easily search and reuse them within our dataflow. This is useful for CDP Workload User
and CDP Workload User Password
.
`NOTE ONLY: To search for existing parameters -
-
Open a processor’s configuration and proceed to the properties tab.
-
Enter: #{
-
Hit
Ctrl+Spacebar
.
This will bring up a list of existing parameters that are not tagged as sensitive.
Let’s go back to the canvas to start designing our flow. This flow will contain 2 Processors:
GenerateFlowFile
: Generates random data.
PutCDPObjectStore
: Loads data into HDFS(S3).
Add GenerateFlowFile
processor: Pull the Processor
onto the canvas and type GenerateFlowFile
in the text box, and once the processor appears click on Add
.
Configure GenerateFlowFile
processor: The GenerateFlowFile
Processor will now be on your canvas and you can configure it by right clicking on it and selecting Configuration
.
Fill in the values in the right window pane to configure the processor in the following way.
Processor Name
: DataGenerator
Scheduling Strategy
: Timer Driven
Run Duration
: 0ms
Run Schedule
: 30 sec
Execution
: All Nodes
Properties
: Custom Text
<26>1 2021-09-21T21:32:43.967Z host1.example.com application4 3064 ID42 [exampleSDID@873 iut="4" eventSource="application" eventId="58"] application4 has
stopped unexpectedly
The above represents a syslog out in RFC5424 format. Subsequent portions of this workshop will leverage this same syslog format.
Add PutCDPObjectStore
processor: Pull a new Processor
onto the canvas and type PutCDPObjectStore
in the text box, and once the processor appears click on Add
.
Configure PutCDPObjectStore
processor: The PutCDPObjectStore
Processor will now be on your canvas and you can configure it by right clicking on it and selecting Configuration
.
Configure the processor in the following way.
Processor Name
: Move2S3
Scheduling Strategy
: Timer Driven
Run Duration
: 0ms
Run Schedule
: 0 sec
Execution
: All Nodes
Properties
Directory
: #{S3 Directory}
CDP Username
: #{CDP Workload User}
CDP Password
: #{CDP Workload User Password}
Relationships
: Check the Terminate
box under success
.
Create connection between processors: Connect the two processors by dragging the arrow from DataGenerator
processor to the Move2S3
processor and select on success
relationship . The click Add
.
Naming the queue: Providing unique names to all queues is very important as they are used to define Key Performance Indicators (KPI) upon which CDF-PC will auto scale. To name a queue, double-click the queue and give it a unique name. A best practice here is to start the existing queue name (i.e. success, failure, retry, etc…) and add the source and destination processor information.
Testing the Data Flow: To test the flow we need to first start the test session. Click on Flow Options
on the top right corner and then click Start
under Test Session
section.
The activation should take about a couple of minutes. While this happens, you will see this at the top right corner of your screen.
Once the Test Session is ready you will see the following message on the top right corner of your screen.
After the flow has been created and tested, we can now Publish
the flow to the Flow Catalog.
Stop the current test session by clicking on the green tab on top right corner indicating Active Test Session
. Click on End
.
Once the session stops click on Flow Options
on the top right corner of your screen and click on Publish
.
Go to the Catalog
and search for the Flow Catalog
by typing the name of the flow that you just now published.
Select the CDP Target Environment' from the drop down. Make sure you select the environment given by the instructor. (Ex: `emeaworkshop-environ
). Click Continue
.
Deployment Name: Give a unique name to the deployment. Click Next →
.
Deployment Name
: {user_id}_flow_prod
(Ex: wuser00_flow_prod
).
Set the Parameters
and click Next
.
CDP Workload User
: The username assigned to you
. Ex: wuser00
.
CDP Workload User Password
: Workload User password set by you in 'Step 1.1: Define Workload Password'
.
CDP Environment
: DummyParameter
S3 Directory
: LabData
Set the cluster size.
Select the Extra Small
size and click Next
. In this step you can configure how your flow will auto scale, but keep it disabled for this lab.
Add Key Performance indicators: Set up KPIs to track specific performance metrics of a deployed flow. Click on Add New KPI
.
In the Add New KPI
window, fill in the details as below.
KPI Scope
: Connection
.
Connection Name
: failure_Move2S3
.
Metric to Track
: Percent Full
.
Check box against Trigger alert when metric is greater than
: 50
Percent
.
Alert will be triggered when metric is outside the boundary(s) for
: 2
Minutes
.
Click on Add
.
Click Deploy
.
The Deployment Initiated
message will be displayed. Wait until the flow deployment is completed, which might take a few minutes.
Manage KPI and Alerts: Click on the KPI and Alerts
tab under Deployment Settings
to get the list of KPIs that have been set. You also have an option to modify or add more KPIs to your flow here.
Manage Parameters: The parameters that we earlier created can be managed from the Parameters tab. Click on Parameters
.
NiFi Configurations: If you have set any configuration w.r.t to Nifi they will show up on the NiFi Configuration
tab.
The purpose of this workshop is to demonstrate how existing NiFi flows externally developed (e,g. on local laptops of developers, or pushed from a code repo) can be migrated to the Data Flow. This workshop will leverage an existing NiFi flow template that has been designed with the best practices for CDF-PC flow deployment.
The existing NiFi Flow will perform the following actions.
- Generate random syslogs in 5424 Format.
- Convert the incoming data to a JSON using record writers.
- Apply a SQL filter to the JSON records.
- Send the transformed syslog messages to Kafka.
Note that a parameter context has already been defined in the flow and the queues have been uniquely named.
For this we will be leveraging the DataHubs which have already been created - ssb-analytics-cluster-emea
, kafka-smm-cluster-emea
.
Note that the above names might be different depending upon your environment.
Go to the Environments
tab as shown in the screenshot. Click on to your environment. (Ex: emearworkshop-environ
).
Login to Streams Messaging Manager
by clicking the appropriate hyperlink in the Streams Messaging Datahub (Ex: kafka-smm-cluster-emea
).
Create a Topic with the following parameters and then click Save
.
Name
: <username>_syslog
. Ex: wuser00_syslog
.
Partitions
: 1
Availability
: MODERATE
CLEANUP POLICY
: delete
Note: The Flow will not work if you set the Cleanup Policy to anything other than Delete
. This is because we are not specifying keys when writing to Kafka.
We will require the broker list to configure our processors to connect to our Kafka brokers which allows consumers to connect and fetch messages by partition, topic or offset. This information can be found in the Datahub cluster associated to the Streams Messaging Manager. Later in the lab, we will need to have at hand the list of kafka brokers - already configured in this environment- so to be able to our dataflow to publish to our Kafka topics.
Example
kafka-smm-cluster-emea-corebroker2.emeawork.dp5i-5vkq.cloudera.site:9093
kafka-smm-cluster-emea-corebroker0.emeawork.dp5i-5vkq.cloudera.site:9093
kafka-smm-cluster-emea-corebroker1.emeawork.dp5i-5vkq.cloudera.site:9093
You need to now work on Schema Registry
. Login to Schema Registry
by clicking the appropriate hyperlink in the Streams Messaging Datahub (Ex: kafka-smm-cluster-emea
).
Create a new schema with the following information.
Name
: <username>_syslog. (Ex: wuser00_syslog
)
Description
: syslog schema for dataflow workshop
Type
: Avro schema provider
Schema Group
: Kafka
Compatibility
: Backward
Evolve
: True
Schema Text
: Copy and paste the below schema text below into the Schema Text
field.
{
"name": "syslog",
"type": "record",
"namespace": "com.cloudera",
"fields": [
{
"name": "priority",
"type": "int"
},
{
"name": "severity",
"type": "int"
},
{
"name": "facility",
"type": "int"
},
{
"name": "version",
"type": "int"
},
{
"name": "timestamp",
"type": "long"
},
{
"name": "hostname",
"type": "string"
},
{
"name": "body",
"type": "string"
},
{
"name": "appName",
"type": "string"
},
{
"name": "procid",
"type": "string"
},
{
"name": "messageid",
"type": "string"
},
{
"name": "structuredData",
"type": {
"name": "structuredData",
"type": "record",
"fields": [
{
"name": "SDID",
"type": {
"name": "SDID",
"type": "record",
"fields": [
{
"name": "eventId",
"type": "string"
},
{
"name": "eventSource",
"type": "string"
},
{
"name": "iut",
"type": "string"
}
]
}
}
]
}
}
]
}
Note: The name of the Kafka Topic (Ex: wuser00_syslog
) you previously created and the Schema Name must be the same.
Open the CDF-PC data service and click on Catalog
in the left tab. Select Import Flow Definition
on the Top Right.
Add the following information.
Flow Name
: <username>_syslog_to_kafka. (Ex: wuser00_syslog_to_kafka
)
Flow Description
: Reads Syslog in RFC 5424 format, applies a SQL filter, transforms the data into JSON records, and publishes to Kafka.
NiFi Flow Configuration
: syslog-to-kafka.json (From the resources downloaded earlier).
Version Comments
: Initial Version.
Search for the flow in the Flow Catalog by typing the flow name that you created in the previous step.
Click on the Flow, you should see the following. You should see a Deploy
Option appear shortly. Then click on Deploy
.
Select the CDP Target Environment
(Ex: emeaworkshop-environ
) where this flow will be deployed, then click Continue
.
Add the Flow Parameters as below. Note that you might have to navigate to multiple screens to fill it. Then click Next
.
CDP Workload User
: The workload username for the current user. (Ex: wuser00
)
CDP Workload Password
: The workload password for the current user (This password was set by you earlier).
Filter Rule
: SELECT * FROM FLOWFILE
.
Kafka Broker Endpoint
: The list of Kafka Brokers previously noted, which is comma separated as shown below.
Example: kafka-smm-cluster-emea-corebroker2.emeawork.dp5i-5vkq.cloudera.site:9093,kafka-smm-cluster-emea-corebroker0.emeawork.dp5i-5vkq.cloudera.site:9093,kafka-smm-cluster-emea-corebroker1.emeawork.dp5i-5vkq.cloudera.site:9093
Kafka Destination Topic
: <username>_syslog (Ex: wuser00_syslog
)
Kafka Producer ID
: nifi_dfx_p1
Schema Name
: <username>-syslog (Ex: wuser00_syslog
)
Schema Registry Hostname
: The hostname of the master server in the Kafka Datahub (Ex: kafka-smm-cluster-emea
) (Refer screenshot below).
Example: kafka-smm-cluster-emea-master0.emeawork.dp5i-5vkq.cloudera.site
On the next page, define sizing and scaling details and then click Next
.
Size
: Extra Small
Auto Scaling
: Enabled
Min Nodes
: 1
Max Nodes
: 3
The purpose of this workshop is to demonstrate streaming analytic capabilities using SQL Stream Builder. We will leverage the NiFi Flow deployed in CDF-PC from the previous step and demonstrate how to query live data and subsequently sink it to another location. The SQL query will leverage the existing syslog schema in Schema Registry.
To run queries on the SQL Stream Builder
you need to have your KeyTab unlocked
. This is mainly for authentication
purposes. As the credential you are using is sometimes reused as part of other people doing the same lab it is possible that your KeyTab is already unlocked
. We have shared the steps for both the scenarios.
Click on the Environment
in the left pane. Click on the environment assigned to you. (Ex: emeaworkshop-environ
).
Click on the User name (Ex: wuser00
) at the bottom left of the screen and select Manage Keytab
. Make sure you are logged in as the username that was assigned to you.
Click on the Environment
in the left pane. Click on the environment assigned to you. (Ex: emeaworkshop-environ
).
Click on the User name (Ex: wuser00
) at the bottom left of the screen and select Manage Keytab
. Make sure you are logged in as the username that was assigned to you.
Hence, it would be necessary to reset here by locking it and unlocking it again using your newly set workload password. So, enter your CDP Workload Username
in Principal Name
(Ex: wuser00
). Click on Lock Keytab
.
Now do the following.
Click on the User name (Ex: wuser00
) at the bottom left of the screen and select Manage Keytab
. Make sure you are logged in as the username that was assigned to you.
In case you are not on the SQL Stream Builder Interface you may reach so by following the next 2 screenshots, else you can continue from the 3rd screenshot.
Go to the SQL Stream Builder UI: SSB Interface can be reached from the DataHub that is running the Streams Analytics, in our case - ssb-analytics-cluster-emea
.
Within the DataHub, click on Streaming SQL Console
.
Create a new project: Create a SQL Stream Builder (SSB) Project by clicking New Project
using the following details.
Name
: {user_id}_hol_workshop
. (Ex: wuser00_hol_workshop
).
Description
: SSB Project to analyze streaming data.
Create Kafka Data Store: Create Kafka Data Store by selecting Data Sources
in the left pane, clicking on the three-dotted icon next to Kafka
, then selecting New Kafka Data Source
.
Name
: {user-id}_cdp_kafka
. (Ex: wuser00_cdp_kafka
)
Brokers
: (Comma-separated List as shown below)
kafka-smm-cluster-emea-corebroker2.emeawork.dp5i-5vkq.cloudera.site:9093,kafka-smm-cluster-emea-corebroker0.emeawork.dp5i-5vkq.cloudera.site:9093,kafka-smm-cluster-emea-corebroker1.emeawork.dp5i-5vkq.cloudera.site:9093
Protocol
: SASL/SSL
SASL Username
: <workload-username>
. (Ex: wuser00).
SASL Mechanism
: PLAIN
.
SASL Password
: Workload User password set by you in Step 1.1: Define Workload Password
.
Create Kafka Table: Create Kafka Table, by selecting Virtual Tables
in the left pane by clicking on the three-dotted icon next to it. Then click on New Kafka Table
.
Configure the Kafka Table using the details below.
Table Name
: {user-id}_syslog_data. (Ex: wuser00_syslog_data
)
Kafka Cluster
: <select the Kafka data source you created previously>
. (Ex: wuser00_cdp_kafka
)
Data Format
: JSON
.
Topic Name
: <select the topic created in Schema Registry>
.
When you select Data Format as AVRO, you must provide the correct Schema Definition when creating the table for SSB to be able to successfully process the topic data. For JSON tables, though, SSB can look at the data flowing through the topic and try to infer the schema automatically, which is quite handy at times. Obviously, there must be data in the topic already for this feature to work correctly.
Note: SSB tries its best to infer the schema correctly, but this is not always possible and sometimes data types are inferred incorrectly. You should always review the inferred schemas to check if it’s correctly inferred and make the necessary adjustments.
Since you are reading data from a JSON topic, go ahead and click on Detect Schema
to get the schema inferred. You should see the schema be updated in the Schema Definition
tab.
You will also notice that a "Schema is invalid" message appears upon the schema detection. If you hover the mouse over the message, it shows the reason.
You will fix this in the next step.
Each record read from Kafka by SSB has an associated timestamp column of data type TIMESTAMP ROWTIME. By default, this timestamp is sourced from the internal timestamp of the Kafka message and is exposed through a column called eventTimestamp. However, if your message payload already contains a timestamp associated with the event (event time), you may want to use that instead of the Kafka internal timestamp.
In this case, the syslog message has a field called timestamp
that contains the timestamp you should use. You want to expose this field as the table’s event_time
column. To do this, click on the Event Time tab and enter the following properties.
Use Kafka Timestamps
: Disable
.
Input Timestamp Column
: timestamp
.
Event Time Column
: event_time
.
Watermark Seconds
: 3
.
Now that you have configured the event time column, click on Detect Schema again. You should see the schema turn valid.
Create a Flink Job, by selecting Jobs
in the left pane, clicking on the three-dotted icon next to it, then clicking on New Job
.
Add the following SQL Statement in the Editor.
SELECT * FROM {user-id}_syslog_data WHERE severity <=3