Airflow从入门到实战(万字长文)

文章目录

  • Airflow 基本概念
    • 概述
    • 名词
  • Airflow 安装
    • Airflow 官网
    • 安装 Python 环境
    • 安装 Miniconda
    • 创建 Python3.8 环境
    • 安装 Airflow
    • 启动停止脚本
  • 安装后的一些细节问题
    • 修改数据库为 MySQL
    • 修改执行器
  • 部署使用
    • 一些重要参数
    • 运行
      • 点击成功任务,查看日志,步骤如下
      • 查看 dag 图、甘特图
      • 查看脚本代码
    • Dag 任务操作
      • 删除 Dag 任务
      • 查看当前所有 dag 任务
  • 配置邮件服务器
    • 参数讲解
    • 启动

Airflow 基本概念

概述

Airflow 是一个以编程方式编写,安排和监视工作流的平台。

使用 Airflow 将工作流编写任务的有向无环图(DAG)。Airflow 计划程序在遵循指定的依赖项,同时在一组工作线程上执行任务。丰富的命令实用程序使在 DAG 上执行复杂的调度变的轻而易举。丰富的用户界面使查看生产中正在运行的管道,监视进度以及需要时对问题进行故障排除变的容易。

名词

(1)Dynamic:Airflow 配置需要实用 Python,允许动态生产管道。这允许编写可动态。这允许编写可动态实例化管道的代码。
(2)Extensible:轻松定义自己的运算符,执行程序并扩展库,使其适合于您的环境。
(3)Elegant:Airlfow 是精简的,使用功能强大的 Jinja 模板引擎,将脚本参数化内置于 Airflow 的核心中。
(4)Scalable:Airflow 具有模板块架构,并使用消息队列来安排任意数量的工作任务。

Airflow 安装

Airflow 官网

https://airflow.apache.org

安装 Python 环境

Airflow 是由 Python 语言编写的 Web 应用,要求 Python3.8 的环境。

安装 Miniconda

conda 是一个开源的包、环境管理器,可以用于在同一个机器上安装不同 Python 版本的软件包及其依赖,并能够在不同的 Python 环境之间切换,Anaconda 包括 Conda、Python 以及一大堆安装好的工具包,比如:numpy、pandas 等,Miniconda 包括 Conda、Python。

此处,我们不需要如此多的工具包,故选择 MiniConda。

1)下载 Miniconda(Python3 版本)

下载地址:https://repo.anaconda.com/miniconda/Miniconda3-latest-Linux-x86_64.sh

2)安装 Miniconda
(1)执行以下命令进行安装,并按照提示操作,直到安装完成。

[root@hadoop102 software]# mkdir airflow
[root@hadoop102 software]# cd airflow/
--将安装包放入此目录中
[root@hadoop102 airflow]$ bash Miniconda3-latest-Linux-x86_64.sh

(2)在安装过程中,出现以下提示时,可以指定安装路径

(3)出现以下字样,即为安装完成

3)加载环境变量配置文件,使之生效

[root@hadoop102 airflow]# source ~/.bashrc

4)取消激活 base 环境

Miniconda 安装完成后,每次打开终端都会激活其默认的 base 环境,我们可通过以下命令,禁止激活默认 base 环境。

[root@hadoop102 airflow]# conda config --set auto_activate_base false

创建 Python3.8 环境

1)配置 conda 国内镜像

(base) [root@hadoop102 ~]$ conda config --add channels https://mirrors.tuna.tsinghua.edu.cn/anaconda/pkgs/free
(base) [root@hadoop102 ~]$ conda config --add channels https://mirrors.tuna.tsinghua.edu.cn/anaconda/pkgs/main
(base) [root@hadoop102 ~]$ conda config --set show_channel_urls yes

2)创建 Python3.8 环境

(base) [root@hadoop102 ~]$ conda create --name airflow python=3.8

说明:conda 环境管理常用命令
创建环境:conda create -n env_name
查看所有环境:conda info –envs
删除一个环境:conda remove -n env_name –all

3)激活 airflow 环境

(base) [root@hadoop102 ~]$ conda activate airflow

激活后效果如下图所示

[root@hadoop102 software]$ conda activate airflow
(airflow) [atguigu@hadoop102 software]$ 

说明:退出当前环境。

(airflow) [atguigu@hadoop102 ~]$ conda deactivate

4)执行 python -V 命令查看 python 版本

(airflow) [root@hadoop102 software]$ python -V
Python 3.8.13

安装 Airflow

1)更改 pip 的源

[root@hadoop102 software]$ conda activate airflow
(airflow) [root@hadoop102 software]$ pip install numpy -i https://pypi.tuna.tsinghua.edu.cn/simple
(airflow) [root@hadoop102 software]$ sudo mkdir ~/.pip
(airflow) [root@hadoop102 software]$ sudo vim ~/.pip/pip.conf

添加以下内容

[global]
index-url = https://pypi.tuna.tsinghua.edu.cn/simple
[install]
trusted-host = https://pypi.tuna.tsinghua.edu.cn

2)安装 airflow

(airflow) [root@hadoop102 software]$ pip install "apache-airflow==2.4.3"

3)初始化 airflow

(airflow) [root@hadoop102 software]$ airflow db init

4)查看版本

(airflow) [root@hadoop102 software]$ airflow version 
2.4.3

5)airflow 安装好存放路径

(airflow) [root@hadoop102 airflow]$ pwd
/root/airflow

6)创建账号

(airflow) [root@hadoop102 airflow]$ airflow users create \
--username admin \
--firstname bigdata \
--lastname bigdata \
--role Admin \
--email 1127914080@qq.com

此时会让你输入密码,这里笔者的密码设置为123456

7)启动 airflow 调度

(airflow) [root@hadoop102 airflow]$ airflow scheduler -D

8)启动 airflow web 服务,启动后浏览器访问 http://hadoop102:8080

(airflow) [root@hadoop102 airflow]$ airflow webserver -p 8080 -D

则已经进入了airflow的页面。

启动停止脚本

https://blog.csdn.net/weixin_45417821/article/details/

安装后的一些细节问题

页面中显示了两个问题,第一个问题是希望将元数据存放在MySQL或者PostgresSQL中,第二个问题是不建议用这个执行器,接下来我们进行改进。

修改数据库为 MySQL

1)在 MySQL 中建库

mysql> CREATE DATABASE airflow_db CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci;

2)如果报错 Linux error:1425F102:SSL routines:ssl_choose_client_version:unsupported protocol,可以关闭 MySQL 的 SSL 证书

查看 SSL 是否开启 YES 为开启

mysql> SHOW VARIABLES LIKE '%ssl%';
+---------------+-----------------+
| Variable_name | Value           |
+---------------+-----------------+
| have_openssl  | YES             |
| have_ssl      | YES             |
| ssl_ca        | ca.pem          |
| ssl_capath    |                 |
| ssl_cert      | server-cert.pem |
| ssl_cipher    |                 |
| ssl_crl       |                 |
| ssl_crlpath   |                 |
| ssl_key       | server-key.pem  |
+---------------+-----------------+
9 rows in set (0.02 sec)

3)修改配置文件 my.cnf,加入以下内容:

vim /etc/my.cnf
# disable_ssl
skip_ssl

并重启mysql

sudo systemctl restart mysqld

4)添加 python 连接的依赖:官网介绍的方法有两种,这里我们选择下面的连接器。

官网连接器地址:https://airflow.apache.org/docs/apache-airflow/2.4.3/howto/set-up-database.html

(airflow) [root@hadoop102 airflow]$ pip install mysql-connector-python

5)修改 airflow 的配置文件:

(airflow)[root@hadoop102 ~]$ cd /root/airflow
[database]
# The SqlAlchemy connection string to the metadata database.
# SqlAlchemy supports many different database engines.
# More information here:
# http://airflow.apache.org/docs/apache-airflow/stable/howto/set-up-database.html#database-uri
#sql_alchemy_conn = sqlite:home/atguigu/airflow/airflow.db
sql_alchemy_conn = mysql+mysqlconnector://root:000000@hadoop102:3306/airflow_db

6)关闭 airflow,初始化后重启:

(airflow) [root@hadoop102 ~]$ af.sh stop
(airflow) [root@hadoop102 airflow]$ airflow db init
(airflow) [root@hadoop102 ~]$ af.sh start

7)初始化报错 1067 – Invalid default value for ‘update_at’:
原因:字段 ‘update_at’ 为 timestamp 类型,取值范围是:1970-01-01 00:00:00 到2037-12-31 23:59:59(UTC +8 北京时间从 1970-01-01 08:00:00 开始),而这里默认给了空值,所以导致失败。

推荐修改 mysql 存储时间戳格式:

mysql> set GLOBAL sql_mode ='STRICT_TRANS_TABLES,NO_AUTO_CREATE_USER,NO_ENGINE_SUBSTITUTION'

重启 MySQL 会造成参数失效,推荐将参数写入到配置文件 my.cnf 中。

vim /etc/my.cnf

sql_mode = STRICT_TRANS_TABLES,NO_AUTO_CREATE_USER,NO_ENGINE_SUBSTITUTION

并重启mysql

sudo systemctl restart mysqld

再次初始化查看效果:命令 :airflow db init

8)重新创建账号登录:

(airflow) [root@hadoop102 airflow]$ airflow users create \
--username admin \
--firstname bigdata \
--lastname bigdata \
--role Admin \
--email 1127914080@qq.com

密码依然是123456

启动airflow ,并打开查看

af.sh start

此时发现已经成功进入了 ,并且数据库方面的提示已经消失了

修改执行器

官网不推荐在开发中使用顺序执行器,会造成任务调度阻塞。

关闭airflow,修改配置文件

[root@hadoop102 bin]# af.sh stop
(airflow) [root@hadoop102 airflow]# vim airflow.cfg 

添加如下内容

[core]
# The executor class that airflow should use. Choices include
# ``SequentialExecutor``, ``LocalExecutor``, ``CeleryExecutor``, ``DaskExecutor``,
# ``KubernetesExecutor``, ``CeleryKubernetesExecutor`` or the
# full import path to the class when using a custom executor.
executor = LocalExecutor

可以使用官方推荐的几种执行器,也可以自定义。这里我们选择本地执行器即可。之后再次启动

[root@hadoop102 bin]# af.sh start

hadoop102:8080

可以发现已经没有警告提示了。

部署使用

1)测试环境启动
本次测试使用的是 spark 的官方案例,所有需要启动 hadoop 和 spark 的历史服务器。(这里笔者已经配置好脚本了)

[root@hadoop102 bin]$ hdp.sh start

2)查看 Airflow 配置文件

(alrflow) [root@airflow work-py]# vim ~/airflow/airflow.cfg


代码仓库的目录,别忘了airflow是用代码进行调度的

3)编写.py 脚本,创建 work-py 目录用于存放 python 调度脚本

(airflow) [root@hadoop102 airflow]$ mkdir ~/airflow/dags
(airflow) [root@hadoop102 airflow]$ cd dags/
(airflow) [root@hadoop102 dags]$ vim wordcount.py

添加如下内容

#!/usr/bin/python
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from datetime import datetime, timedelta
default_args = {
 # 用户 test_owner DAG下的所有者
 'owner': 'luanhao',
 # 是否开启任务依赖
 'depends_on_past': True, 
 # 邮箱
 'email': ['1127914080@qq.com'],
 # 启动时间
 'start_date':datetime(2023,1,19),
 # 出错是否发邮件报警
 'email_on_failure': False,
 # 重试是否发邮件报警
 'email_on_retry': False,
 # 重试次数
 'retries': 1,
 # 重试时间间隔
 'retry_delay': timedelta(minutes=5),
}
# 声明任务图
# test代表任务名称(可以修改其他名称,这里我们用wordcount)
dag = DAG('wordcount', default_args=default_args, schedule_interval=timedelta(days=1))
# 创建单个任务
t1 = BashOperator(
 # 任务 id
 task_id='dwd',
 # 任务命令 使用Spark的wordcount
 bash_command='ssh hadoop102 "/opt/module/spark-3.2-yarn/bin/spark-submit --class org.apache.spark.examples.SparkPi --master yarn /opt/module/spark-3.2-yarn/examples/jars/spark-examples*.jar 10 "',
 # 重试次数
 retries=1,
 # 把任务添加进图中
 dag=dag)

t2 = BashOperator(
 task_id='dws',
 bash_command='ssh hadoop102 "/opt/module/spark-3.2-yarn/bin/spark-submit --class org.apache.spark.examples.SparkPi --master yarn /opt/module/spark-3.2-yarn/examples/jars/spark-examples*.jar 10 "',
 retries=1,
 dag=dag)

t3 = BashOperator(
 task_id='ads',
 bash_command='ssh hadoop102 "/opt/module/spark-3.2-yarn/bin/spark-submit --class org.apache.spark.examples.SparkPi --master yarn /opt/module/spark-3.2-yarn/examples/jars/spark-examples*.jar 10 "',
 retries=1,
 dag=dag)
 
# 设置任务依赖
t2.set_upstream(t1)
t3.set_upstream(t2)

一些重要参数

必须导包

from airflow import DAG
from airflow.operators.bash_operator import BashOperator

default_args 设置默认参数。
depends_on_past 是否开启任务依赖。
schedule_interval 调度频率。
retries 重试次数 。
start_date 开始时间。
BashOperator 具体执行任务,如果为 true 前置任务必须成功完成才会走下一个依赖任务,如果为 false 则忽略是否成功完成。
task_id 任务唯一标识(必填)。
bash_command 具体任务执行命令。
set_upstream 设置依赖 如上图所示 ads 任务依赖 dws 任务依赖 dwd 任务。

出现wordcount,

运行

点击成功任务,查看日志,步骤如下

日志内容如下:

查看 dag 图、甘特图

查看脚本代码

Dag 任务操作

删除 Dag 任务

主要删除 DAG 任务不会删除底层文件,过一会还会自动加载回来。

查看当前所有 dag 任务

# 查看所有任务
(airflow) [root@hadoop102 airflow]$ airflow list_dags
# 查看单个任务
(airflow) [root@hadoop102 airflow]$ airflow tasks list wordcount --tree

配置邮件服务器

1)保证邮箱已开 SMTP 服务(这里我们使用QQ邮箱,当然其他邮箱也可以)

2)修改 airflow 配置文件,用 stmps 服务对应 587 端口

(airflow) [root@hadoop102 airflow]$ vim ~/airflow/airflow.cfg
smtp_host = smtp.qq.com
smtp_starttls = True
smtp_ssl = False
smtp_user = 1127914080@qq.com
# smtp_user =
smtp_password = yyyfjkoqvsnzhhgb
# smtp_password =
smtp_port = 587
smtp_mail_from = 1127914080@qq.com

3)重启 airflow

[root@hadoop102 bin]$ af.sh stop
[root@hadoop102 bin]$ af.sh start

4)新增 workflows.py 脚本,并加入邮箱功能

(airflow) [root@hadoop102 dags]# vim workflows.py

#!/usr/bin/python
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.operators.email_operator import EmailOperator
from datetime import datetime, timedelta
default_args = {
 # 用户
 'owner': 'luanhao',
 # 是否开启任务依赖
 'depends_on_past': True, 
 # 邮箱
 'email': ['1127914080@qq.com'],
 # 启动时间
 'start_date':datetime(2023,1,19),
 # 出错是否发邮件报警
 'email_on_failure': True,
 # 重试是否发邮件报警
 'email_on_retry': True,
 # 重试次数
 'retries': 1,
 # 重试时间间隔
 'retry_delay': timedelta(minutes=5),
}

# 声明任务图
dag = DAG('workflows', default_args=default_args, schedule_interval=timedelta(days=1))
# 创建单个任务
t1 = BashOperator(
 # 任务 id
 task_id='dwd',
 # 任务命令
 bash_command='ssh hadoop102 "/opt/module/spark-3.2-yarn/bin/spark-submit --class org.apache.spark.examples.SparkPi --master yarn /opt/module/spark-3.2-yarn/examples/jars/spark-examples*.jar 10 "',
 # 重试次数
 retries=1,
 # 把任务添加进图中
 dag=dag)

t2 = BashOperator(
 task_id='dws',
 bash_command='ssh hadoop102 "/opt/module/spark-3.2-yarn/bin/spark-submit --class org.apache.spark.examples.SparkPi --master yarn /opt/module/spark-3.2-yarn/examples/jars/spark-examples*.jar 10 "',
 retries=3,
 dag=dag)

t3 = BashOperator(
 task_id='ads',
 bash_command='ssh hadoop102 "/opt/module/spark-3.2-yarn/bin/spark-submit --class org.apache.spark.examples.SparkPi --master yarn /opt/module/spark-3.2-yarn/examples/jars/spark-examples*.jar 10 "',
 retries=3,
 dag=dag)

email=EmailOperator(
 # 邮箱id
 task_id="email",
 # 发送给xxx
 to="1063182043@qq.com ",
 # 邮箱主题
 subject="hi,你好啊,我是你来自远方的最亲密的伙伴",
 # 发送内容
 html_content="<h1>后天就要过年了,明天该剪头了撒</h1>",
 # 抄送给xxx
 cc="1127914080@qq.com ",
 dag=dag)

# 任务之间相互依赖
t2.set_upstream(t1)
t3.set_upstream(t2)
email.set_upstream(t3)

参数讲解

task_id=“email” : 邮箱的id
to=”1063182043@qq.com ” :发送给对方
subject=“hi,你好啊,我是你来自远方的最亲密的伙伴”:邮箱的主题描述
html_content= “< h1 >后天就要过年了,明天该剪头了撒</ h1 >”:邮箱的内容
cc=”1127914080@qq.com ” : 邮箱的抄送内容,抄送给对方

启动

运行测试

文章出处登录后可见!

已经登录?立即刷新

共计人评分,平均

到目前为止还没有投票!成为第一位评论此文章。

(0)
扎眼的阳光的头像扎眼的阳光普通用户
上一篇 2023年12月11日
下一篇 2023年12月11日

相关推荐