一小时搭建一个云原生机器学习平台

在之前的文章中,我们有介绍过 MLOps 相关的概念和流程。最近几年这个方向的发展非常快,诞生了很多初创公司,例如可以参考 Huyen Chip 的这篇 blog,描绘了整体的市场情况。今天我们来看下如何利用一些云计算的平台和开源框架来快速的搭建一个“云上机器学习平台”。

这篇文章中的很多内容受到了 这个项目 的启发。整体完成的框架结构大致如下所示:

v2-5b6358676ed895bff5b5636d8f3a332f_1440w

开发环境

我目前主要用 Windows 系统来进行日常工作,所以本文会基于 Windows 的开发环境来演示。在开始之前,你需要先配置好基础的软件依赖,例如 Windows Terminal,Visual Studio Code,scoop/choco 包管理工具,Python 基础环境等。建议可以参考 我的这篇回答

基础设施

最近发现了一个非常有意思的 infrastructure as code 的框架,名叫 pulumi。之前对这个领域的感觉是需要写一大堆的 Shell/Python 脚本,或者管理大量的 yaml 文件(例如 terraform 等)来构建一套集群环境,涉及到很多流程串联的 glue code,还需要自己管理各种失败,清理,重试等场景,非常繁琐复杂。看到在 pulumi 上用代码来创建一个 eks 的集群,只需要几行代码:

import * as eks from "@pulumi/eks";

// Create an EKS cluster with the default configuration.
const cluster = new eks.Cluster("my-cluster");

// Export the cluster's kubeconfig.
export const kubeconfig = cluster.kubeconfig;

就搞定了!非常令人震惊!想起当年为了评估 Kubeflow 折腾 K8s 的血泪史,要是早知道这个工具,对生产力的提升绝对是巨大的。

因为使用 pulumi 来做整个底层 infra 的搭建,所以对应的云计算平台我们选择了比较成熟的 AWS。当然有些组件例如 feast(一个开源 feature store 框架)目前只支持 GCP 上的一些数据服务 (BigQuery, redis 等),我们完全可以用 pulumi 来管理整个混合云架构。

机器学习平台

在这个演示项目中,我们针对各种机器学习需求选用了各种开源框架来进行搭建,包括:

  • 数据管理:使用了 DVC 进行数据版本管理,当然也可以选择 Pachyderm,Hudi 等更大型的框架。
  • 特征管理:目前还没有实现。这方面开源框架的选择主要是 Feast 和 Hopsworks。
  • 流程编排:我们选择了比较好上手的 Prefect 来进行各种流程的编排,这里可以选的就有很多了,从老牌的 Luigi,Airflow 到新一代的 Flyte,Kubeflow 等。
  • 实验记录:选用了目前最流行的 MLFlow,主要也是因为可以自行部署。其它像 Weights and Biases,Neptune,谷歌的 MLMD 等也都是不错的选择。
  • 参数管理:选用了来自 Facebook 的 Hydra 进行尝试。
  • 服务打包:我们使用 Docker 对机器学习服务进行打包,其中的模型部分也会从 MLFlow 中载入。
  • 模型服务:使用最简单的 K8s 托管的容器进行服务,web 框架选择了 FastAPI。
  • 网络路由:选用了更加“现代化”的 Traefik,统一在 pulumi 中配置管理,非常方便。
  • 持续集成:直接利用 Github actions,代码提交后自动执行测试与部署,流畅。

后面我们会详细介绍这些组件的构建使用方式。

部署流程

云服务设置

在 AWS 和 pulumi 上,我们都需要创建相应的账号,并获取到 access token。以 AWS 为例,可以在 IAM 中生成访问密钥,然后在命令行中进行配置:

v2-eb6359d16cc5cb610096a3df9ed62f10_1440w

这里默认我都使用的是 PowerShell 7:

$env:AWS_ACCESS_KEY_ID = "<YOUR_ACCESS_KEY_ID>"; $env:AWS_SECRET_ACCESS_KEY = "<YOUR_SECRET_ACCESS_KEY>"

安装各类工具包

一开始我使用的是 scoop,但后来发现有不少工具默认给的包管理工具都是 choco,所以下面的代码中会发现两者混合着使用。如果想重复这个流程,建议可以统一用 choco。如果是 Mac 上基本没啥争议,用 brew 就好。

安装 pulumi,node,aws 等依赖。另外 Docker Desktop 也是需要的 :)

scoop install pulumi
scoop install nodejs
msiexec.exe /i https://awscli.amazonaws.com/AWSCLIV2.msi
choco install -y aws-iam-authenticator
choco install -y kubernetes-helm

对于 Python 的包管理,我尝试了一下比较新的 PDM,感觉相当好用,比起使用 pip,conda 等手动管理环境方便很多。不过 PDM 中支持的 PEP 582 还有些前卫,PyCharm 等 IDE 中的支持也不够好,所以如果成熟项目可能更建议使用 poetry

安装 PDM:

(Invoke-WebRequest -Uri https://raw.githubusercontent.com/pdm-project/pdm/main/install-pdm.py -UseBasicParsing).Content | python -

构建基础设施

Pulumi 虽然也提供了 Python 接口,但通过翻阅文档发现大量的功能支持都是在 TypeScript 接口中支持,所以构建代码我们也选择了 TypeScript 来写。从头创建一个 Pulumi 项目:

pulumi new aws-typescript

根据提示输入各种信息即可。我们在这个项目中会创建如下内容:

  • 基于 eks 的 K8s 集群,用于运行各种服务如 MLFlow,模型预测等。
  • 一个 postgresql 服务,供 MLFlow 存储元数据用。
  • 几个 s3 bucket,分别用于 DVC 数据存储,MLFlow artifacts 存储,以及用户自定义存储。
  • 通过 helm 来部署 MLFlow
  • 创建 service account,控制 MLFlow 和模型服务来访问 s3 的权限
  • 通过 helm 来部署 Traefik,并配置转发规则
  • 利用 Route 53 服务来绑定域名

在开发 infra 定义过程中,可能需要安装一些额外的 pulumi 包,用npm install命令即可。

写完相关代码后,直接运行pulumi up就可以进行整体部署了!

v2-0bb6f48ca572ec423901cf6ca16bd5e0_1440w

短短百来行代码,pulumi 就会帮我们部署这 60 多个组件,而且还会自动处理他们的各种依赖层级关系。如果部署过程中出现了失败,也不要慌,再跑一次pulumi up,系统会自动帮我们判断哪些组件已经创建,并完成剩余部分的工作,跟 K8s 的声明状态的概念很相像。

整个启动过程也很快,在我这边测试大概会花 15-20 分钟。完成之后,我们先来测试一下 K8s 的访问:

# 将 kubeconfig 加载到配置中
pulumi stack output kubeconfig > ~\.kube\config

(这个 stack output 也会用于后面获取 MLFlow,s3 地址等)

然后执行 kubectl 就能看到云端的双节点 K8s 已经在正常运行啦:

v2-3a8dbee91a026b60d4fa6482004d7e82_1440w

还可以通过域名访问下 MLFlow,确认一下已经可以正常运作。

如果需要删除整个部署,只需要执行pulumi destroy即可。如果有任何更改,修改完代码执行pulumi up,系统就会自动进行对比和更新。比如我在完成初始化部署后,又加了些代码来部署 Prefect:

v2-e8c7f4ad368b4d7dafb635f8779185f8_1440w

Python 环境搭建

接下来我们创建一个新的 Python 项目,用于机器学习平台逻辑的开发。

执行pdm init,然后根据提示来完成一系列项目信息的配置。接下来如果需要添加依赖,只需要简单的运行pdm add dvc[s3]这样的命令即可。在运行完成后,pdm 会自动帮用户维护好pyproject.toml文件,而不需要像之前执行pip freeze之类的命令。这个项目依赖的内容生成出来如下:

dependencies = [
    "lightgbm~=3.2",
    "mlflow~=1.18",
    "boto3~=1.17",
    "fastapi~=0.65",
    "uvicorn~=0.14",
    "dvc[s3]~=2.3",
    "pyarrow~=4.0",
    "scikit-learn~=0.24",
    "hydra-core~=1.1",
    "matplotlib~=3.4",
    "prefect~=0.14",
    "cloudpickle~=1.6",
]
requires-python = ">=3.8"

在装完依赖后,我们只需要在项目目录下执行pdm run python就可以进入到对应的“虚拟环境”中,import 刚才安装的各种包。

数据版本管理

在这个演示项目中,我们使用最简单的 iris dataset 来进行建模。分以下几个步骤:

  1. 从 sklearn 中把这份数据导出来,存成 parquet 格式,把它放到项目的data/raw目录下。
  2. 执行pdm run dvc --cd data add raw/iris.parquet添加文件到 dvc 中。
  3. 执行pdm run dvc --cd data push将文件同步到远程 s3 bucket 中。
  4. 执行git add raw/iris.parquet.dvc将文件版本信息通过 git 来管理。

我们也可以在 aws 客户端检查一下数据是否已经上传:

❯ aws s3 ls s3://dvc-bucket-89c41d1/65/
2021-06-23 12:32:43       5408 6c39bcb78e4c2d2bef07f0af8bdbc7

可以看到 dvc 的整体使用体验上与 git 还是非常类似的,核心思想就是把文件的版本信息通过 git 来管理,这样每次代码提交时,都会记录当时的数据版本。而具体的数据文件,可以选择同步到远程 s3 存储中,这样其它开发者也可以通过dvc pull来获取到对应版本的数据。

看了下 dvc 的官网,发现他们现在的功能也已经远不止数据版本管理了,还加上了流程执行,实验记录等功能。不过看起来略复杂,我们还是先使用简单的版本管理功能即可。

Pipeline 开发

Pipeline 的主题流程比较简单,我们做两个简单的交叉特征,然后再用 lightgbm 来训练,主要是为了来演示如何利用上述所说的参数管理,实验管理等框架。

特征处理

在特征方面,我们暂时没有引入复杂的 feature store,而是利用 sklearn pipeline 构建了一个标准的特征转换流程。然后我们利用 pickle 保存到 s3 上的 artifact bucket 进行存储,后续在模型服务时可以直接载入这个 pipeline 来对用户请求进行特征构建。

def persist_pipeline(pipeline, bucket):
    key = 'feature_pipeline.pkl'
    pkl_byte_obj = cloudpickle.dumps(pipeline)
    s3 = boto3.client('s3')
    s3.put_object(Bucket=bucket, Key=key, Body=pkl_byte_obj)

当然这个办法看起来比较土,很多高级 transform 工具会直接把特征部分也编译到模型图里,例如 tensorflow 之类的框架。

参数配置

我们利用 Hydra 把参数统一放置在 conf 文件夹下,通过 git 进行管理。这样做的好处是可以比较清楚在同一个地方看到所有的配置项,包括基础设施的配置和模型参数的配置等。例如:

work_dir: "${hydra:runtime.cwd}"
data_dir: "${work_dir}/data"
s3_bucket: "artifact-bucket-c35d05d"
base_uri: "http://zijie0.link"
params:
  learning_rate: 0.1
  num_iterations: 10
  feature_fraction: 0.8
  bagging_fraction: 0.8

另外我们也可以在运行时灵活调整参数,例如执行pdm run python pipeline.py params.learning_rate=0.02修改 learning_rate 参数。

调度运行

调度运行框架一般都会比较复杂,我们这里用的 Prefect 也不例外。在演示中只添加了最基础的 Task 和 Flow 概念,跟 Hydra 结合起来的代码结构如下:

@hydra.main(config_path="conf", config_name="config")
def main(config):
    with Flow('LightGBM Pipeline') as flow:
        build_features(config)
        train_model(config,
                    learning_rate=config.params.learning_rate,
                    num_iterations=config.params.num_iterations,
                    feature_fraction=config.params.feature_fraction,
                    bagging_fraction=config.params.bagging_fraction)
    flow.run()

当我们在 Prefect UI 上执行 flow 时,我们更希望能手动输入各种参数变量,这时候可以使用Parameter类来实现:

with Flow('LightGBM Pipeline') as flow:
    learning_rate = Parameter('learning_rate', default=0.1)
    num_iterations = Parameter('num_iterations', default=50)
    feature_fraction = Parameter('feature_fraction', default=0.8)
    bagging_fraction = Parameter('bagging_fraction', default=0.8)
    build_features(config)
    train_model(config,
                learning_rate=learning_rate,
                num_iterations=num_iterations,
                feature_fraction=feature_fraction,
                bagging_fraction=bagging_fraction)
flow.register("Iris Prediction")

实验记录

我们利用 MLFlow 来进行实验记录,需要修改的代码也不多:

# Enable auto logging
mlflow.set_tracking_uri(config.mlflow.uri)
mlflow.lightgbm.autolog()

with mlflow.start_run() as run:
    # Train model
    params = {
        "objective": "multiclass",
        "num_class": 3,
        "learning_rate": config.params.learning_rate,
        "num_iterations": config.params.num_iterations,
        "metric": "multi_logloss",
        "feature_fraction": config.params.feature_fraction,
        "bagging_fraction": config.params.bagging_fraction,
        "seed": 42,
    }

    model = lgb.train(params, train_data, valid_sets=[train_data])

    # Evaluate model
    y_proba = model.predict(X_test)
    y_pred = y_proba.argmax(axis=1)

    loss = log_loss(y_test, y_proba)
    acc = accuracy_score(y_test, y_pred)

    # Log metrics
    mlflow.log_metrics({
        "log_loss": loss,
        "accuracy": acc
    })

组合起来

我们把上述的几个步骤串起来之后,可以直接运行pdm run python pipeline.py来触发整体运行,通过 Prefect 来调度执行,将实验结果记录到 MLFlow 上,并保存模型,同时特征也会保存到 s3 的 artifact 存储中。

v2-4c9feaee16c98af1988e3e18a7fd01a8_1440w

MLFlow 中也会记录各种模型信息,比如 feature importance 等。如果是深度学习模型,还可以支持 tensorboard。

v2-8414a8a1a6cd9b63c9fddd6d5929f38e_1440w

当然也支持多次运行间的对比。

v2-5ff7de88aabf0bde8e9b9bac23f62ee6_1440w

另外 Prefect 也有很不错的 UI 管理支持。不过我通过 helm 来部署发现一些静态资源有些问题,没有花时间进一步在 aws 上实验。在本地用 Docker 玩了一下,发现 UI 非常新潮,功能上也挺强大(作为开源项目),日志,调度,多 agent 等都有。

v2-603613c834fc7779a23e4940821fc297_1440w

支持动态参数:

v2-90c08a567240f760bc2776f1303be939_1440w

甘特图,流程结构也是一应俱全!

v2-6c6895c435b58bb1d27a75fa42938196_1440w
v2-d771b3e42becd2c91d05599182ebf525_1440w

甚至还带了个非常灵活的 graphql 查询引擎,可以做非常灵活的系统对接与扩展。

v2-25125261d59e92d60f2b1ef7c22f6738_1440w

上线部署

模型服务

有了训练好的模型,我们就可以创建一个 web 服务对外提供 iris 分类的智能预测功能了!这里我们使用了 FastAPI 作为 web 框架来搭建这个服务,总体会比较基础。像 TFServing,KFServing 则要强大的多,能够提供 batch inference,蓝绿部署/影子模型等各种高级功能。

FastAPI 相关的代码也非常简单:

@app.post("/predict")
def predict(request: PredictRequest):
    # Can be used for evaluation
    request_id = uuid.uuid4()
    df = pd.DataFrame(columns=['sepal_length', 'sepal_width', 'petal_length', 'petal_width'],
                      data=[[request.sepal.length, request.sepal.width, request.petal.length, request.petal.width]])

    pipeline = load_pipeline()
    df = pipeline.fit_transform(df)

    # TODO: save request for data and model monitoring
    y_pred = np.argmax(model.predict(df))
    return {"request_id": request_id, "flower": flower_name_by_index[y_pred]}

这里我们做了几个预留设计:

  1. 演示了通过 cloudpickle 来保存和读取 feature transform,保证线上线下的特征构造一致性。更进一步,我们还可以使用 feature store 或其它带数据转换的整体模型方案。
  2. 在服务请求时生成了 request_id,后续应该可以把这个信息保留下来,以便在真实值产生后进行模型评估,并进一步做模型监控。
  3. 请求数据本身也应该监控其数据分布,异常或错误值等信息,确保线上线下数据分布的一致性,并及时对异常情况进行告警。

部署

我们同样也可以用 pulumi 来进行后续的上线部署,包括镜像 repo 的创建,serving image 的打包,服务路由设置,服务节点的动态扩缩容等等,一条pulumi up,我们的服务就跑起来了!

// 从之前的 stack 获取 k8s 配置信息
const provider = new k8s.Provider('provider', {
    kubeconfig: baseStack.requireOutput('kubeconfig'),
})

// 打镜像,推到 aws
const image = awsx.ecr.buildAndPushImage('iris-image', {
    context: '../',
});

// 构建 pod, deploy, service
const podBuilder = new kx.PodBuilder({
    containers: [{
        image: image.imageValue,
        ports: {http: 80},
        env: {
            'LISTEN_PORT': '80',
            'MLFLOW_TRACKING_URI': baseStack.requireOutput('traefikURI').apply((baseURI: string) => `${baseURI}/mlflow`),
            'MLFLOW_RUN_ID': config.require('runID'),
        }
    }],
    serviceAccountName: baseStack.requireOutput('modelsServiceAccountName'),
});

// 这里可以起多个 replica 做扩展
const deployment = new kx.Deployment('iris-serving', {
    spec: podBuilder.asDeploymentSpec({replicas: 1})
}, {provider});

const service = deployment.createService();

完成之后,我们可以来测试一下:

v2-d2a43e3b3576f496746aec5d4fd6fdf9_1440w

成功啦!

CI/CD

在持续集成方面,我们可以直接利用 github 的 actions,实现代码 push 后的自动测试,打包部署。这里因为没有设置 aws 的 token,所以在 dvc 同步 s3 文件时失败了。

v2-05c47a8ba3c515856df3140ed604dd1a_1440w

云计算开销

本次实验大约花了一天时间,其间在 aws 上创建了 80+的资源组件,不过没有跑多少实际的业务复杂。那么一天下来,我们总体的资金开销是多少呢?

v2-fa368821dcc1214097b444228994e146_1440w

嗯,差不多也就是一杯咖啡的价钱,如果使用量上去了可能就会贵起来啦。

TODO

可以看到使用一系列开源框架,结合好用的云服务工具,我们可以在几个小时内非常快速的搭出一个“云机器学习平台”的雏形来,还挺有模有样的。不过可以优化完善的地方还有很多,跟专业的商业产品距离还有点大。比如:

  1. 完善的数据质量检查与测试。
  2. 模型 pipeline 的集成测试。
  3. 大数据量的计算处理支持,例如 Spark,Dask 等。
  4. 高级的 feature store 的支持,提供更加丰富的数据管理和消费支持。
  5. 专业的数据管控支持,包括血缘关系,隐私检查,加密等。
  6. Notebook 的交互式探索开发支持,比如集成 JupyterHub。
  7. Pipeline 编排中的高级特性,包括完善的监控,失败后继续执行,与 k8s 的深度集成等。
  8. 模型训练的高级支持,如利用 CRD 来做分布式训练,异构硬件支持等。
  9. 实验分析的智能化分析检测支持。
  10. 在部署时利用 cloud native 的高级能力,例如自动扩容,sidecar 做流量切换等。
  11. 模型的持续监控,自动触发再训练与模型升级。
  12. 其它云原生服务的利用,例如权限认证体系,日志收集,安全监控等。

所以感觉可以改造提升的地方还是有很多的,这个领域目前的初创公司也是如雨后春笋般冒了出来。我们观远也正在着力打造新一代的云原生一站式数据智能平台,欢迎有兴趣的同学交流探讨,当然更好是能加入观远,一起做一些有意思的事情啦 :)

开源

最后把我个人尝试搭建的代码放在了 github 上,有兴趣的同学可以访问 cloud_ml_platform 进行参考。

本文由SuperQSC发布,转载请注明出处:https://www.chaozhixingqiu.com/2202.html

(1)
上一篇 2022年3月17日 下午7:54
下一篇 2022年3月17日

相关推荐