IT技术互动交流平台

airflow详解

来源:IT165收集  发布日期:2016-07-13 21:27:03

Airflow能做什么

Airflow是一个工作流分配管理系统。

安装和使用

 

最简单安装

pip install airflow

pip install "airflow[crypto, password]"

 

安装成功之后,就可以使用了,默认使用SequentialExecutor, 顺次执行任 务。

初始化数据库 airflow initdb

启动web服务器 airflow webserver -p 8080

启动任务 airflow scheduler

或者测试文章末尾的DAG airflow test ct1 print_date 2016-05-14

配置 mysql以启用LocalExecutor

安装mysql数据库支持

yum install mysql mysql-server

pip install airflow[mysql]

设置mysql根用户的密码

mysql -uroot登录mysql数据库

 

在mysql操作界面依次输入sql语句

SET PASSWORD=PASSWORD("passwd");

FLUSH PRIVILEGES;

新建用户和数据库

新建名字为airflow的数据库 CREATE DATABASE airflow;

 

新建用户ct,密码为152108, 该用户对数据网airflow有完全操作 权限

GRANT all privileges on airflow.* TO 'ct'@'localhost' IDENTIFIED BY '152108';

FLUSH PRIVILEGES;

 

新建数据库celery_result_airflow用于celery_result_backend (这一步后来没用到,可以略过)

mysql -uct -p152108

CREATE DATABASE celery_result_airflow;

修改airflow配置文件支持mysql

airflow.cfg 文件通常在~/airflow目录下

 

更改数据库链接:

sql_alchemy_conn = mysql://ct:152108@localhost/airflow

对应字段解释如下:

dialect+driver://username:password@host:port/database

初始化数据库 airflow initdb

 

初始化数据库成功后,可进入mysql查看新生成的数据表。

mysql -uct -p152108

USE airflow;

SHOW TABLES;

 

centos7中使用mariadb取代了mysql, 但所有命令的执行相同

yum install mariadb mariadb-server

systemctl start mariadb ==> 启动mariadb

systemctl enable mariadb ==> 开机自启动

mysql_secure_installation ==> 设置 root密码等相关

mysql -uroot -p123456 ==> 测试登录!

使用LocalExecutor (可选方案一)

配置LocalExecutor

 

修改airflow配置文件

 

airflow.cfg 文件通常在~/airflow目录下

 

更改executor为 executor = LocalExecutor

测试

在~/airflow/dags下新建几个task文件,具体见TASK

顺次执行下面的命令

airflow initdb (若前面执行过,就跳过)

airflow webserver --debug &

airflow scheduler

打开网页查看运行进程

使用CeleryExecutor (可选方案二)

配置celery+rabbitmq支持

安装celery和rabbitmq

pip install airflow[celery]

 

pip install airflow[rabbitmq]

 

安装erlang和rabbitmq (Centos 6,REF):

wget https://packages.erlang-solutions.com/erlang/esl-erlang/FLAVOUR_1_general/esl-erlang_18.3-1~centos~6_amd64.rpm

yum install esl-erlang_18.3-1~centos~6_amd64.rpm

wget https://github.com/jasonmcintosh/esl-erlang-compat/releases/download/1.1.1/esl-erlang-compat-18.1-1.noarch.rpm

yum install esl-erlang-compat-18.1-1.noarch.rpm

wget http://www.rabbitmq.com/releases/rabbitmq-server/v3.6.1/rabbitmq-server-3.6.1-1.noarch.rpm

yum install rabbitmq-server-3.6.1-1.noarch.rpm

 

配置rabbitmq

启动rabbitmq: rabbitmq-server -detached

开机启动rabbitmq: chkconfig rabbitmq-server on

 

配置rabbitmq (REF)

rabbitmqctl add_user ct 152108

rabbitmqctl add_vhost ct_airflow

rabbitmqctl set_user_tags ct airflow

rabbitmqctl set_permissions -p ct_airflow ct “.” ”.” ”.*”

rabbitmq-plugins enable rabbitmq_management # no usage

 

修改airflow配置文件支持Celery

 

airflow.cfg 文件通常在~/airflow目录下

 

更改executor为 executor = CeleryExecutor

 

更改broker_url

broker_url = amqp://ct:152108@localhost:5672/ct_airflow

Format explanation: transport://userid:password@hostname:port/virtual_host

 

更改celery_result_backend,

celery_result_backend = amqp://ct:152108@localhost:5672/ct_airflow

Format explanation: transport://userid:password@hostname:port/virtual_host

可以与broker_url相同

测试

启动服务器:airflow webserver --debug

启动celery worker (不能用根用户):airflow worker

启动scheduler: airflow scheduler

提示:

测试过程中注意观察运行上面3个命令的3个窗口输出的日志

当遇到不符合常理的情况时考虑清空 airflow backend的数据库, 可使用airflow resetdb清空。

删除dag文件后,webserver中可能还会存在相应信息,这时需要重启 webserver。

关闭webserver: ps -ef|grep -Ei '(airflow-webserver)'| grep master | awk '{print $2}'|xargs -i kill {}

一个脚本控制airflow系统的启动和重启

#!/bin/bash

#set -x
#set -e
set -u

usage()
{
cat <<EOF
${txtcyn}
Usage:

$0 options${txtrst}

${bldblu}Function${txtrst}:

This script is used to start or restart webserver service.

${txtbld}OPTIONS${txtrst}:
	-S	Start airflow system [${bldred}Default FALSE${txtrst}]
	-s	Restart airflow server only [${bldred}Default FALSE${txtrst}]
	-a	Restart all airflow programs including webserver, worker and
		scheduler. [${bldred}Default FALSE${txtrst}]
EOF
}

start_all=
server_only=
all=

while getopts "hs:S:a:" OPTION
do
	case $OPTION in
		h)
			usage
			exit 1
			;;
		S)
			start_all=$OPTARG
			;;
		s)
			server_only=$OPTARG
			;;
		a)
			all=$OPTARG
			;;
		?)
			usage
			exit 1
			;;
	esac
done

if [ -z "$server_only" ] && [ -z "$all" ] && [ -z "${start_all}" ]; then
	usage
	exit 1
fi

if [ "$server_only" == "TRUE" ]; then
	ps -ef | grep -Ei '(airflow-webserver)' | grep master | 
		awk '{print $2}' | xargs -i kill {}
	cd ~/airflow/
	nohup airflow webserver >webserver.log 2>&1 &
fi

if [ "$all" == "TRUE" ]; then
	ps -ef | grep -Ei 'airflow' | grep -v 'grep' | awk '{print $2}' | xargs -i kill {}
	cd ~/airflow/
	nohup airflow webserver >>webserver.log 2>&1 &
	nohup airflow worker >>worker.log 2>&1 &
	nohup airflow scheduler >>scheduler.log 2>&1 &
fi


if [ "${start_all}" == "TRUE" ]; then
	cd ~/airflow/
	nohup airflow webserver >>webserver.log 2>&1 &
	nohup airflow worker >>worker.log 2>&1 &
	nohup airflow scheduler >>scheduler.log 2>&1 &
fi

airflow.cfg 其它修改

 

dags_folder

dags_folder目录支持子目录和软连接,因此不同的dag可以分门别类的存储 起来。

 

设置邮件发送服务

smtp_host = smtp.163.com

smtp_starttls = True

smtp_ssl = False

smtp_user = username@163.com

smtp_port = 25

smtp_password = userpasswd

smtp_mail_from = username@163.com

 

多用户登录设置 (似乎只有CeleryExecutor支持)

 

修改airflow.cfg中的下面3行配置

authenticate = True

auth_backend = airflow.contrib.auth.backends.password_auth

filter_by_owner = True

 

增加一个用户

import airflow

from airflow import models, settings

from airflow.contrib.auth.backends.password_auth import PasswordUser

user = PasswordUser(models.User())

user.username = ‘ehbio’

user.email = ‘mail@ehbio.com’

user.password = ‘ehbio’

session = settings.Session()

session.add(user)

session.commit()

session.close()

exit()

TASK

参数解释

 

depends_on_past

Airflow assumes idempotent tasks that operate on immutable data chunks. It also assumes that all task instance (each task for each schedule) needs to run.

If your tasks need to be executed sequentially, you need to tell Airflow: use the depends_on_past=True flag on the tasks that require sequential execution.)

 

timestamp in format like 2016-01-01T00:03:00

 

Task中调用的命令出错后需要在网站Graph view中点击run手动重启。 为了方便任务修改后的顺利运行,有个折衷的方法是:

设置 email_on_retry: True

设置较长的retry_delay,方便在收到邮件后,立即做出处理

然后再修改为较短的retry_delay,方便快速启动

 

写完task DAG后,一定记得先检测下有无语法错误 python dag.py

测试文件1:ct1.py

from airflow import DAG
from airflow.operators import BashOperator, MySqlOperator

from datetime import datetime, timedelta

one_min_ago = datetime.combine(datetime.today() -
	timedelta(minutes=1), datetime.min.time())

default_args = {
    'owner': 'airflow',         
		
	#为了测试方便,起始时间一般为当前时间减去schedule_interval
    'start_date': datatime(2016, 5, 29, 8, 30), 
    'email': ['chentong_biology@163.com'],
    'email_on_failure': False, 
    'email_on_retry': False, 
	'depends_on_past': False, 
    'retries': 1, 
    'retry_delay': timedelta(minutes=5), 
    #'queue': 'bash_queue',
    #'pool': 'backfill', 
    #'priority_weight': 10, 
	#'end_date': datetime(2016, 5, 29, 11, 30), 
}

# DAG id 'ct1'必须是unique的, 一般与文件名相同
dag = DAG('ct1', default_args=default_args,
    schedule_interval="@once")

t1 = BashOperator(
    task_id='print_date', 
    bash_command='date', 
    dag=dag)

#cmd = "/home/test/test.bash " 注意末尾的空格
t2 = BashOperator(
    task_id='echo', 
    bash_command='echo "test" ', 
    retries=3, 
    dag=dag)

templated_command = """
    
"""
t3 = BashOperator(
    task_id='templated', 
    bash_command=templated_command, 
    params={'my_param': "Parameter I passed in"}, 
    dag=dag)

# This means that t2 will depend on t1 running successfully to run
# It is equivalent to t1.set_downstream(t2)
t2.set_upstream(t1)


t3.set_upstream(t1)

# all of this is equivalent to
# dag.set_dependency('print_date', 'sleep')
# dag.set_dependency('print_date', 'templated')

测试文件2: ct2.py

from airflow import DAG
from airflow.operators import BashOperator

from datetime import datetime, timedelta

one_min_ago = datetime.combine(datetime.today() - timedelta(minutes=1),
                                  datetime.min.time())

default_args = {
    'owner': 'airflow',         
    'depends_on_past': True, 
    'start_date': one_min_ago,
    'email': ['chentong_biology@163.com'],
    'email_on_failure': True, 
    'email_on_retry': True, 
    'retries': 5, 
    'retry_delay': timedelta(hours=30), 
    #'queue': 'bash_queue',
    #'pool': 'backfill', 
    #'priority_weight': 10, 
	#'end_date': datetime(2016, 5, 29, 11, 30), 
}

dag = DAG('ct2', default_args=default_args,
    schedule_interval="@once")

t1 = BashOperator(
    task_id='run1', 
    bash_command='(cd /home/ct/test; bash run1.sh -f ct_t1) ', 
    dag=dag)

t2 = BashOperator(
    task_id='run2', 
    bash_command='(cd /home/ct/test; bash run2.sh -f ct_t1) ', 
    dag=dag)

t2.set_upstream(t1)

run1.sh

#!/bin/bash

#set -x
set -e
set -u

usage()
{
cat <<EOF
${txtcyn}
Usage:

$0 options${txtrst}

${bldblu}Function${txtrst}:

This script is used to do ********************.

${txtbld}OPTIONS${txtrst}:
	-f	Data file ${bldred}[NECESSARY]${txtrst}
	-z	Is there a header[${bldred}Default TRUE${txtrst}]
EOF
}

file=
header='TRUE'

while getopts "hf:z:" OPTION
do
	case $OPTION in
		h)
			usage
			exit 1
			;;
		f)
			file=$OPTARG
			;;
		z)
			header=$OPTARG
			;;
		?)
			usage
			exit 1
			;;
	esac
done

if [ -z $file ]; then
	usage
	exit 1
fi

cat <<END >$file
A
B
C
D
E
F
G
END

sleep 20s

run2.sh

#!/bin/bash

#set -x
set -e
set -u

usage()
{
cat <<EOF
${txtcyn}
Usage:

$0 options${txtrst}

${bldblu}Function${txtrst}:

This script is used to do ********************.

${txtbld}OPTIONS${txtrst}:
	-f	Data file ${bldred}[NECESSARY]${txtrst}
EOF
}

file=
header='TRUE'

while getopts "hf:z:" OPTION
do
	case $OPTION in
		h)
			usage
			exit 1
			;;
		f)
			file=$OPTARG
			;;
		?)
			usage
			exit 1
			;;
	esac
done

if [ -z $file ]; then
	usage
	exit 1
fi

awk 'BEGIN{OFS=FS="	"}{print $0, "53"}' $file >${file}.out

其它问题

 

The DagRun object has room for a conf parameter that gets exposed in the “context” (templates, operators, …). That is the place where you would associate parameters to a specific run. For now this is only possible in the context of an externally triggered DAG run. The way the TriggerDagRunOperator works, you can fill in the conf param during the execution of the callable that you pass to the operator.

If you are looking to change the shape of your DAG through parameters, we recommend doing that using “singleton” DAGs (using a “@once” schedule_interval), meaning that you would write a Python program that generates multiple dag_ids, one of each run, probably based on metadata stored in a config file or elsewhere.

The idea is that if you use parameters to alter the shape of your DAG, you break some of the assumptions around continuity of the schedule. Things like visualizing the tree view or how to perform a backfill becomes unclear and mushy. So if the shape of your DAG changes radically based on parameters, we consider those to be different DAGs, and you generate each one in your pipeline file.

 

完全删掉某个DAG的信息

set @dag_id = 'BAD_DAG';
delete from airflow.xcom where dag_id = @dag_id;
delete from airflow.task_instance where dag_id = @dag_id;
delete from airflow.sla_miss where dag_id = @dag_id;
delete from airflow.log where dag_id = @dag_id;
delete from airflow.job where dag_id = @dag_id;
delete from airflow.dag_run where dag_id = @dag_id;
delete from airflow.dag where dag_id = @dag_id;

supervisord自动管理进程

[program:airflow_webserver]
command=/usr/local/bin/python2.7 /usr/local/bin/airflow webserver
user=airflow
environment=AIRFLOW_HOME="/home/airflow/airflow", PATH="/usr/local/bin:%(ENV_PATH)s"
stderr_logfile=/var/log/airflow-webserver.err.log
stdout_logfile=/var/log/airflow-webserver.out.log

[program:airflow_worker]
command=/usr/local/bin/python2.7 /usr/local/bin/airflow worker
user=airflow
environment=AIRFLOW_HOME="/home/airflow/airflow", PATH="/usr/local/bin:%(ENV_PATH)s"
stderr_logfile=/var/log/airflow-worker.err.log
stdout_logfile=/var/log/airflow-worker.out.log

[program:airflow_scheduler]
command=/usr/local/bin/python2.7 /usr/local/bin/airflow scheduler
user=airflow
environment=AIRFLOW_HOME="/home/airflow/airflow", PATH="/usr/local/bin:%(ENV_PATH)s"
stderr_logfile=/var/log/airflow-scheduler.err.log
stdout_logfile=/var/log/airflow-scheduler.out.log

在特定情况下,修改DAG后,为了避免当前日期之前任务的运行,可以使用backfill填补特定时间段的任务

airflow backfill -s START -e END -mark_success DAG_ID

端口转发

 

之前的配置都是在内网服务器进行的,但内网服务器只开放了SSH端口22,因此 我尝试在另外一台电脑上使用相同的配置,然后设置端口转发,把外网服务器 的rabbitmq的5672端口映射到内网服务器的对应端口,然后启动airflow连接 。

ssh -v -4 -NF -R 5672:127.0.0.1:5672 aliyun

 

上一条命令表示的格式为

ssh -R <local port>:<remote host>:<remote port> <SSH hostname>

local port表示hostname的port

Remote connections from LOCALHOST:5672 forwarded to local address 127.0.0.1:5672

-v: 在测试时打开

-4: 出现错误”bind: Cannot assign requested address”时,force the ssh client to use ipv4

若出现”Warning: remote port forwarding failed for listen port 52698” ,关掉其它的ssh tunnel。

不同机器使用airflow

 

在外网服务器(用做任务分发服务器)配置与内网服务器相同的airflow模块

 

使用前述的端口转发以便外网服务器绕过内网服务器的防火墙访问rabbitmq 5672端口。

 

在外网服务器启动 airflow webserver scheduler, 在内网服务器启动 airflow worker 发现任务执行状态丢失。继续学习Celery,以解决此问题。

安装redis (最后没用到)

http://download.redis.io/releases/redis-3.2.0.tar.gz

tar xvzf redis-3.2.0.tar.gz and make

redis-server启动redis

使用ps -ef | grep 'redis'检测后台进程是否存在

检测6379端口是否在监听netstat -lntp | grep 6379

任务未按预期运行可能的原因

检查 start_date 和end_date是否在合适的时间范围内

检查 airflow worker, airflow scheduler和 airflow webserver --debug的输出,有没有某个任务运行异常

检查airflow配置路径中logs文件夹下的日志输出

若以上都没有问题,则考虑数据冲突,解决方式包括清空数据库或着给当前 dag一个新的dag_id

References

https://pythonhosted.org/airflow/

http://kintoki.farbox.com/post/ji-chu-zhi-shi/airflow

http://www.jianshu.com/p/59d69981658a

http://bytepawn.com/luigi-airflow-pinball.html

https://github.com/airbnb/airflow

https://media.readthedocs.org/pdf/airflow/latest/airflow.pdf

http://www.csdn.net/article/1970-01-01/2825690

http://www.cnblogs.com/harrychinese/p/airflow.html

https://segmentfault.com/a/1190000005078547

延伸阅读:

Tag标签: airflow详解  
  • 专题推荐

About IT165 - 广告服务 - 隐私声明 - 版权申明 - 免责条款 - 网站地图 - 网友投稿 - 联系方式
本站内容来自于互联网,仅供用于网络技术学习,学习中请遵循相关法律法规