Deployment Guide


Deployment Guide

English | 简体中文


At present, BitSail supports flink deployment on Yarn and native Kubernetes.

Here are the contents of this part:


Yarn Deployment

Below is a step-by-step guide to help you effectively deploy it on Yarn.

Pre configuration

Configure Hadoop Environment

To support Yarn deployment, HADOOP_CLASSPATH has to be set in system environment properties. There are two ways to set this environment property:

  1. Set HADOOP_CLASSPATH directly.

  2. Set HADOOP_HOME targeting to the hadoop dir in deploy environment. The bitsailopen in new window scripts will use the following command to generate HADOOP_CLASSPATH.

if [ -n "$HADOOP_HOME" ]; then
  export HADOOP_CLASSPATH=$($HADOOP_HOME/bin/hadoop classpath)
fi

After packaging, the project production contains a file conf/bitsail.confopen in new window. This file describes the system configuration of deployment environment, including the flink path and some other default parameters.

Here are some frequently-used options in the configuration file:

PrefixParameter nameDescriptionExample
sys.flink.flink_homeThe root dir of flink.${BITSAIL_HOME}/embedded/flink
checkpoint_dirThe path storing the meta data file and data files of checkpoints.
Reference: Flink Checkpoints
"hdfs://opensource/bitsail/flink-1.11/checkpoints/"
flink_default_propertiesGeneral flink runtime options configued by "-D".{
classloader.resolve-order: "child-first"
akka.framesize: "838860800b"
rest.client.max-content-length: 838860800
rest.server.max-content-len
}

Submit to Yarn

You can use the startup script bin/bitsail to submit flink jobs to yarn.

The specific commands are as follows:

bash ./bin/bitsail run --engine flink --conf [job_conf_path] --execution-mode run --queue [queue_name] --deployment-mode yarn-per-job [--priority [yarn_priority] -p/--props [name=value]] 

Parameter description

  • Required parameters
    • queue_name: Target yarn queue
    • job_conf_path: Path of job configuration file
  • Optional parameters
    • yarn_priority: Job priority on yarn
    • name=value: Flink properties, for example classloader.resolve-order=child-first
      • name: Property key. Configurable flink parameters that will be transparently transmitted to the flink task.
      • value: Property value.

Submit an example job

Submit a fake source to print sink test to yarn.

bash ./bin/bitsail run --engine flink --conf ~/bitsail-archive-0.2.0-SNAPSHOT/examples/Fake_Print_Example.json --execution-mode run -p 1=1  --deployment-mode yarn-per-job  --queue default

Log for Debugging

Client side log file

Please check ${FLINK_HOME}/log/ folder to read the log file of BitSail client.

Yarn task log file

Please go to Yarn WebUI to check the logs of Flink JobManager and TaskManager.


Suppose that BitSail install path is: ${BITSAIL_HOME}.

After building BitSail, we can enter the following path and find runnable jars and example job configuration files:

cd ${BITSAIL_HOME}/bitsail-dist/target/bitsail-dist-0.2.0-SNAPSHOT-bin/bitsail-archive-0.2.0-SNAPSHOT/

Users can use commands --deployment-mode remote to submit a BitSail job to remote flink session. Use examples/Fake_Print_Example.jsonopen in new window as example to start a BitSail job:

  • <job-manager-address>: the address of job manager, should be host:port, e.g. localhost:8081.
bash bin/bitsail run \
  --engine flink \
  --execution-mode run \
  --deployment-mode remote \
  --conf examples/Fake_Print_Example.json \
  --jm-address <job-manager-address>

For example, we can use the script bitsail-archive-0.1.0-SNAPSHOT/embedded/flink/bin/start-cluster.sh to start a standalone session. Then we can run the example with following commands:

bash bin/bitsail run \
  --engine flink \
  --execution-mode run \
  --deployment-mode remote \
  --conf examples/Fake_Print_Example.json \
  --jm-address localhost:8081

Then you can visit Flink WebUI to see the running job. In task manager, we can see the output of the Fake_to_Print job in its stdout.

Run in Local Mini-Cluster

Users can use commands --deployment-mode local to run a BitSail job locally. Use examples/Fake_Print_Example.jsonopen in new window as example to start a BitSail job:

bash bin/bitsail run \
  --engine flink \
  --execution-mode run \
  --deployment-mode local \
  --conf examples/Fake_Print_Example.json

Run Fake_to_Print example

Take examples/Fake_hive_Example.jsonopen in new window as another example:

  • Remember fulfilling the job configuration with an available hive source before run the command:
    • job.writer.db_name: the hive database to write.
    • job.writer.table_name: the hive table to write.
    • job.writer.metastore_properties: add hive metastore address to it, like:
       {
          "job": {
            "writer": {
              "metastore_properties": "{\"hive.metastore.uris\":\"thrift://localhost:9083\"}"
            }
          }
       }
    

Then you can use the similar command to submit a BitSail job to specified Flink session:

bash bin/bitsail run \
  --engine flink \
  --execution-mode run \
  --deployment-mode local \
  --conf examples/Fake_Hive_Example.json

When any of the reader or writer data source is relate to hadoop, e.g., hive_to_print job, the hadoop libs are needed. There are two ways to offer hadoop libs for local minicluster:

  1. If you already have local hadoop environment, then you can directly set $HADOOP_HOME to the folder of your hadoop libs. For example:
export HADOOP_HOME=/usr/local/hadoop-3.1.1
  1. If there is no hadoop environment, you can use flink-shaded-hadoop. Remember moving the uber jar to your flink lib dir. For example, suppose the flink root dir is /opt/flink:
# download flink-shaded-hadoop-uber jar
wget https://repo.maven.apache.org/maven2/org/apache/flink/flink-shaded-hadoop-2-uber/2.7.5-10.0/flink-shaded-hadoop-2-uber-2.7.5-10.0.jar

# move to flink libs
mv flink-shaded-hadoop-2-uber-2.7.5-10.0.jar /opt/flink/lib/flink-shaded-hadoop-uber.jar

Native Kubernetes Deployment

At present, BitSail supports native Kubernetes via Flink 1.11 engine.

Below is a step-by-step guide to help you effectively deploy it on native Kubernetes. Currently, BitSail support Application deployment mode: Allows users to create a single image containing their Job and the Flink runtime, which will automatically create and destroy cluster components as needed.

Prerequisites

  1. Kubernetes >= 1.9
  2. KubeConfig, which has access to list, create, delete pods and services, configurable via ~/.kube/config. You can verify permissions by running kubectl auth can-i <list|create|edit|delete> pods
  3. Kubernetes DNS enabled
  4. Have compiled BitSail ready (After building with ${BITSAIL_HOME}/build.sh, the artifacts will be located in ${BITSAIL_HOME}/output/)

If you have problems setting up a Kubernetes cluster, then take a look at how to setup a Kubernetes clusteropen in new window.

Pre Configuration

Setup RBAC

Role-based access control (RBACopen in new window) is a method of regulating access to compute or network resources based on the roles of individual users within an enterprise. Users can configure RBAC roles and service accounts used by JobManager to access the Kubernetes API server within the Kubernetes cluster.

Every namespace has a default service account. However, the default service account may not have the permission to create or delete pods within the Kubernetes cluster. Users can instead use the following command to create a new service account <self-defined-service-account> and set the role binding. Then use the config option kubernetes.service-account=<self-defined-service-account> to make the JobManager pod use the <self-defined-service-account> service account to create/delete TaskManager pods and leader ConfigMaps. Also this will allow the TaskManager to watch leader ConfigMaps to retrieve the address of JobManager and ResourceManager.

$ kubectl create serviceaccount <self-defined-service-account> # Please replace <self-defined-service-account> with a custom name
$ kubectl create clusterrolebinding <self-defined-cluster-role-binding> --clusterrole=edit --serviceaccount=default:<self-defined-service-account> # Please replace <self-defined-service-account> and <self-defined-cluster-role-binding> with custom names

Application Mode

Application mode allows users to create a single image containing their Job and the Flink runtime, which will automatically create and destroy cluster components as needed. The Flink community provides base docker images customizedopen in new window for any use case.

Build your <CustomImage> using the Dockerfileopen in new window from ${BITSAIL_HOME}/output/Dockerfile:

Publish your <CustomImage> onto Dockerhub so that Kubernetes cluster can download:

How to create and manage docker repositories.open in new window

docker build -t <your docker repository>:<tag>
docker push <your docker repository>:<tag>

Start Application

bash ${BITSAIL_HOME}/bin/bitsail run \
   --engine flink \
   --target kubernetes-application \
   --deployment-mode kubernetes-application \
   --execution-mode run-application \
   -p kubernetes.jobmanager.service-account=<self-defined-service-account> \
   -p kubernetes.container.image=<CustomImage> \
   -p kubernetes.jobmanager.cpu=0.25 \
   -p kubernetes.taskmanager.cpu=0.5 \
   --conf-in-base64 <base64 conf>

User can specify more configurations by adding more -p key=value in bitsail command lines.

Configurations:

KeyRequired or OptionalDefaultTypeDescription
kubernetes.cluster-idOptionalbitsail-<instance-id>StringThe cluster-id, which should be no more than 45 characters, is used for identifying a unique Flink cluster. If not set, the client will automatically generate it with a random numeric ID with 'bitsail-' prefix.
kubernetes.cluster.jar.pathOptional"/opt/bitsail/bitsail-core.jar"StringThe BitSail jar path in kubernetes cluster.
kubernetes.container.imageRequiredThe default value depends on the actually running version. In general it looks like "flink:<FLINK_VERSION>-scala_<SCALA_VERSION>"StringImage to use for BitSail containers. The specified image must be based upon the same Apache Flink and Scala versions as used by the application. Visit https://hub.docker.com/_/flink?tab=tags for the images provided by the Flink project.
kubernetes.container.image.pull-policyOptionalIfNotPresentEnum. Possible values: [IfNotPresent, Always, Never]The Kubernetes container image pull policy (IfNotPresent or Always or Never). The default policy is IfNotPresent to avoid putting pressure to image repository.
kubernetes.container.image.pull-secretsOptional(none)List <String>A semicolon-separated list of the Kubernetes secrets used to access private image registries.
kubernetes.hadoop.conf.config-map.nameOptional(none)StringSpecify the name of an existing ConfigMap that contains custom Hadoop configuration to be mounted on the JobManager(s) and TaskManagers.
kubernetes.jobmanager.cpuOptional1.0DoubleThe number of cpu used by job manager
kubernetes.jobmanager.service-accountRequired"default"StringService account that is used by jobmanager within kubernetes cluster. The job manager uses this service account when requesting taskmanager pods from the API server.
kubernetes.namespaceOptional"default"StringThe namespace that will be used for running the jobmanager and taskmanager pods.
kubernetes.taskmanager.cpuOptional-1.0DoubleThe number of cpu used by task manager. By default, the cpu is set to the number of slots per TaskManager

Stop Application

Users can go to Flink WebUI to cancel running jobs.

Alternatively, users can run the following bitSail command to cancel a job.

Noted that

  • <jobId> can be retrieved from Flink JobManager, either from logs or WebUI.
  • <cluster-id> can be retrieved from kubectl get deployment
kubectl get deployment
# expected output
NAME           READY   UP-TO-DATE   AVAILABLE   AGE
<cluster-id>   1/1     1            1           22s
bash ${BITSAIL_HOME}/bin/bitsail stop \
   --engine flink \
   --target kubernetes-application \
   --deployment-mode kubernetes-application \
   --execution-mode cancel \
   -p kubernetes.cluster-id=<cluster-id> \
   --job-id <jobId>

If users want to delete the whole application, users can run kubectl commands to delete the whole deployment in order to stop the application

kubectl delete deployments bitsail-job

Kubernetes Logs

There are three types of logs:

  1. BitSail client log: ${FLINK_HOME}/log/flink-xxx.log on client end
  2. BitSail JobManager log: /opt/flink/log/jobmanager.log on Kubernetes JobManager pod
  3. BitSail TaskManager log: /opt/flink/log/taskmanager.log on Kubernetes TaskManager pod

If you want to use kubectl logs <PodName> to view the logs, you must perform the following:

  1. Add a new appender to the log4j.properties in the Flink client.
  2. Add the following ‘appenderRef’ the rootLogger in log4j.properties rootLogger.appenderRef.console.ref = ConsoleAppender.
  3. Stop and start your Application again. Now you could use kubectl logs to view your logs.
# Log all infos to the console
appender.console.name = ConsoleAppender
appender.console.type = CONSOLE
appender.console.layout.type = PatternLayout
appender.console.layout.pattern = %d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n

User can dump JobManager/TaskManager logs on client end by running kubectl commands

# During job running
kubectl get pods # Will return jobmanager pod and taskmanager pod

kubectl logs -f <jobmanagerPod> # Will dump jobManager log

kubectl logs -f <taskmanagerPod>  # Will dump taskManager log

History Server

Flink has a history server that can be used to query the statistics of completed jobs after the corresponding Flink cluster has been shut down. Furthermore, it exposes a REST API that accepts HTTP requests and responds with JSON data. More information in https://nightlies.apache.org/flink/flink-docs-release-1.11/monitoring/historyserver.htmlopen in new window

Start or stop the HistoryServer

${FLINK_HOME}/bin/historyserver.sh (start|start-foreground|stop)

Run BitSail command line to configure history server.

bash ${BITSAIL_HOME}/bin/bitsail run \
   --engine flink \
   --target kubernetes-application \
   --deployment-mode kubernetes-application \
   --execution-mode run-application \
   -p kubernetes.cluster-id=<cluster-id> \
   -p kubernetes.jobmanager.service-account=<self-defined-service-account> \
   -p kubernetes.container.image=<CustomImage> \
   -p kubernetes.jobmanager.cpu=0.25 \
   -p kubernetes.taskmanager.cpu=0.5 \
   -p jobmanager.archive.fs.dir=hdfs:///completed-jobs/ \
   -p historyserver.web.address=0.0.0.0 \
   -p historyserver.web.port 8082 \
   -p historyserver.archive.fs.dir hdfs:///completed-jobs/ \
   -p historyserver.archive.fs.refresh-interval 10000 \
   --conf-in-base64 <base64 conf>