1+ #! /usr/bin/env bash
2+
3+ # set -x
4+ set -e
5+ USER=` whoami`
6+ CLUSTER_NAME=" $USER -flink-156"
7+ NUM_WORKERS=2
8+ FLINK_VERSION=1.5.6
9+ WORK_DIR=" gs://clouddfe-$USER /tmp"
10+ CLOUD_WORKER_IMAGE=" gcr.io/dataflow-build/$USER /beam_fnapi_python:latest"
11+ TASK_MANAGER_MEM=10240
12+ FLINK_LOCAL_PORT=8081
13+ TASK_MANAGER_SLOTS=1
14+ DATAPROC_VERSION=1.2
15+
16+ MASTER_NAME=" $CLUSTER_NAME -m"
17+ FLINK_INIT=" $WORK_DIR /flink/flink-init-dataproc.sh"
18+ DOCKER_INIT=" $WORK_DIR /flink/docker-init.sh"
19+ LOCAL_WORKER_IMAGE=" $USER -docker-apache.bintray.io/beam/python:latest"
20+ FLINK_DOWNLOAD_URL=" http://archive.apache.org/dist/flink/flink-$FLINK_VERSION /flink-$FLINK_VERSION -bin-hadoop28-scala_2.11.tgz"
21+
22+ YARN_APPLICATION=" "
23+ YARN_APPLICATION_MASTER=" "
24+
25+
26+ function is_master() {
27+ local role=" $( /usr/share/google/get_metadata_value attributes/dataproc-role) "
28+ if [[ " $role " == ' Master' ]] ; then
29+ true
30+ else
31+ false
32+ fi
33+ }
34+
35+ function get_leader() {
36+ local i=0
37+ local -A application_ids
38+ local -A application_masters
39+ # gcloud compute ssh yarn@$MASTER_NAME --command="yarn application -list" | grep "$CLUSTER_NAME"
40+ echo " Yarn Applications"
41+ while read line; do
42+ echo $line
43+ application_ids[$i ]=` echo $line | sed " s/ .*//" `
44+ application_masters[$i ]=` echo $line | sed " s/.*$CLUSTER_NAME /$CLUSTER_NAME /" | sed " s/ .*//" `
45+ i=$(( i+ 1 ))
46+ done <<< $( gcloud compute ssh yarn@$MASTER_NAME --command=" yarn application -list" | grep " $CLUSTER_NAME " )
47+
48+ if [ $i != 1 ]; then
49+ echo " Multiple applications found. Make sure that only 1 application is running on the cluster."
50+ for app in ${application_ids[*]} ;
51+ do
52+ echo $app
53+ done
54+
55+ echo " Execute 'gcloud compute ssh yarn@$MASTER_NAME --command=\" yarn application -kill <APP_NAME>\" ' to kill the yarn application."
56+ exit 1
57+ fi
58+
59+ YARN_APPLICATION=${application_ids[0]}
60+ YARN_APPLICATION_MASTER=${application_masters[0]}
61+ echo " Using Yarn Application $YARN_APPLICATION $YARN_APPLICATION_MASTER "
62+ }
63+
64+ function upload_worker_image() {
65+ echo " Tagging worker image $LOCAL_WORKER_IMAGE to $CLOUD_WORKER_IMAGE "
66+ docker tag $LOCAL_WORKER_IMAGE $CLOUD_WORKER_IMAGE
67+ echo " Pushing worker image $CLOUD_WORKER_IMAGE to GCR"
68+ docker push $CLOUD_WORKER_IMAGE
69+ }
70+
71+ function pull_worker_image() {
72+ echo " Pulling worker image $CLOUD_WORKER_IMAGE on workers $( gcloud compute instances list | sed " s/ .*//" | grep " ^\($CLUSTER_NAME -m$\|$CLUSTER_NAME -w-[a-zA-Z0-9]*$\)" ) "
73+ gcloud compute instances list | sed " s/ .*//" | grep " ^\($CLUSTER_NAME -m$\|$CLUSTER_NAME -w-[a-zA-Z0-9]*$\)" | xargs -I INSTANCE -P 100 gcloud compute ssh yarn@INSTANCE --command=" docker pull $CLOUD_WORKER_IMAGE "
74+ }
75+
76+ function start_yarn_application() {
77+ echo " Starting yarn application on $MASTER_NAME "
78+ execute_on_master " /usr/lib/flink/bin/yarn-session.sh -n $NUM_WORKERS -tm $TASK_MANAGER_MEM -s $TASK_MANAGER_SLOTS -d -nm flink_yarn"
79+ }
80+
81+ function execute_on_master() {
82+ gcloud compute ssh yarn@$MASTER_NAME --command=" $1 "
83+ }
84+
85+ function upload_resources() {
86+ local TMP_FOLDER=` mktemp -d -t flink_tmp_XXXX`
87+
88+ echo " Downloading flink at $TMP_FOLDER "
89+ wget -P $TMP_FOLDER $FLINK_DOWNLOAD_URL
90+
91+ echo " Uploading resources to GCS $WORK_DIR "
92+ cp ./create_cluster.sh $TMP_FOLDER
93+ cp ./docker-init.sh $TMP_FOLDER
94+ cp ./flink-init-dataproc.sh $TMP_FOLDER
95+
96+ gsutil cp -r $TMP_FOLDER /* $WORK_DIR /flink
97+
98+ rm -r $TMP_FOLDER
99+ }
100+
101+ function start_tunnel() {
102+ local job_server_config=` execute_on_master " curl -s \" http://$YARN_APPLICATION_MASTER /jobmanager/config\" " `
103+ local key=" jobmanager.rpc.port"
104+ local yarn_application_master_host=` echo $YARN_APPLICATION_MASTER | cut -d " :" -f1`
105+
106+ jobmanager_rpc_port=` echo $job_server_config | python -c " import sys, json; print [ e['value'] for e in json.load(sys.stdin) if e['key'] == u'$key '][0]" `
107+ local tunnel_command=" gcloud compute ssh $MASTER_NAME -- -L $FLINK_LOCAL_PORT :$YARN_APPLICATION_MASTER -L $jobmanager_rpc_port :$yarn_application_master_host :$jobmanager_rpc_port -D 1080"
108+ local kill_command=" gcloud compute ssh yarn@$MASTER_NAME --command=\" yarn application -kill $YARN_APPLICATION \" "
109+ echo " ===================Closing the shell does not stop the yarn application==================="
110+ echo " Execute \" $kill_command \" to kill the yarn application."
111+ echo " Starting tunnel \" $tunnel_command \" "
112+ echo " Exposing flink jobserver at localhost:$FLINK_LOCAL_PORT "
113+ gcloud compute ssh yarn@$MASTER_NAME -- -L $FLINK_LOCAL_PORT :$YARN_APPLICATION_MASTER -L $jobmanager_rpc_port :$yarn_application_master_host :$jobmanager_rpc_port -D 1080
114+ echo " ===================Closing the shell does not stop the yarn application==================="
115+ echo " Execute \" $kill_command \" to kill the yarn application."
116+ echo " To re-establish tunnel use \" $tunnel_command \" "
117+ }
118+
119+ function create_cluster() {
120+ echo " Starting dataproc cluster."
121+ gcloud dataproc clusters create $CLUSTER_NAME --num-workers=$NUM_WORKERS --initialization-actions $FLINK_INIT ,$DOCKER_INIT --metadata flink_version=$FLINK_VERSION ,work_dir=$WORK_DIR /flink --image-version=$DATAPROC_VERSION
122+ echo " Sleeping for 30 sec"
123+ sleep 30s
124+ }
125+
126+ function main() {
127+ upload_resources
128+ create_cluster # Comment this line to use existing cluster.
129+ start_yarn_application # Comment this line if yarn application is already running on the cluster.
130+ get_leader
131+ upload_worker_image
132+ pull_worker_image
133+ start_tunnel
134+ }
135+
136+ main " $@ "
0 commit comments