Flink on YARN with CDH: A Guide to Distributed Stream Processing
In today's era of big data, stream processing has become an essential component for real-time analytics and data processing. Apache Flink is a powerful and versatile framework for distributed stream processing, capable of handling massive volumes of data with low latency. In this article, we will explore how to set up and run Flink on YARN in a CDH (Cloudera Distribution including Apache Hadoop) cluster.
Before diving into the code examples, let's first understand the architecture and components involved in running Flink on YARN. Flink primarily consists of two main components: the JobManager and the TaskManager. The JobManager is responsible for coordinating the execution of jobs, while the TaskManager executes the computational tasks assigned by the JobManager.
To run Flink on YARN, we need to configure the necessary dependencies and settings. Here's an example of a flink-conf.yaml file:
# Flink Configuration
jobmanager.rpc.address: localhost
jobmanager.rpc.port: 6123
yarn.application.name: Flink on YARN
yarn.queue: default
yarn.container.memory: 1024
yarn.container.vcores: 1
In the above configuration, we specify the address and port for the JobManager, the YARN application name, the YARN queue to use, and the resource allocation for YARN containers.
Now that we have set up Flink on YARN, let's see how we can run a simple Flink job using the Flink YARN API. In this example, we will count the occurrences of words in a stream of text.
First, we need to create a StreamExecutionEnvironment and set it up to run on YARN:
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(2);
env.addSource(new TextFileSource("hdfs:///path/to/text/file"))
.flatMap((String line, Collector<Tuple2<String, Integer>> out) -> {
for (String word : line.split(" ")) {
out.collect(new Tuple2<>(word, 1));
}
})
.keyBy(0)
.sum(1)
.print();
In the above code snippet, we set the parallelism to 2, read the text file from HDFS, split each line into words, and emit them as key-value pairs. We then group the pairs by key and calculate the sum of the values for each key. Finally, we print the result to the console.
To submit this job to YARN, we need to create a YarnClusterDescriptor and configure it with the necessary settings:
Configuration flinkConfig = new Configuration();
flinkConfig.setString("yarn.application.name", "Flink Word Count");
flinkConfig.setInteger("yarn.container.memory", 1024);
flinkConfig.setInteger("yarn.container.vcores", 1);
YarnClusterDescriptor yarnClusterDescriptor = new YarnClusterDescriptor(flinkConfig);
yarnClusterDescriptor.setLocalJarPath(new Path("/path/to/flink-job.jar"));
ClusterSpecification clusterSpecification = new ClusterSpecification.ClusterSpecificationBuilder()
.setMasterMemoryMB(1024)
.setTaskManagerMemoryMB(4096)
.setNumberTaskManagers(2)
.setSlotsPerTaskManager(2)
.createClusterSpecification();
ClusterClient<ApplicationId> clusterClient = yarnClusterDescriptor.deployApplicationCluster(clusterSpecification);
JobGraph jobGraph = env.getStreamGraph().getJobGraph();
clusterClient.run(jobGraph);
In the above code snippet, we create a Configuration object to specify the YARN application name and resource allocation. We then instantiate a YarnClusterDescriptor and set the path to our Flink job JAR file. Next, we define the cluster specifications, including the memory and slots per task manager. Finally, we deploy the Flink application cluster and run the job graph.
In this article, we explored how to set up and run Flink on YARN in a CDH cluster. We saw how to configure Flink to use YARN as the execution environment and how to submit a Flink job to YARN using the Flink YARN API. By leveraging the power of Flink and the scalability of YARN, we can process massive volumes of data in real-time with low latency.
By providing a simple and flexible API, Flink enables developers to build complex stream processing applications with ease. With the ability to run on YARN within a CDH cluster, Flink becomes an even more powerful tool for big data processing.