• spark支持深度学习批量推理


    背景

    在数据量较大的业务场景中,spark在数据处理、传统机器学习训练、
    深度学习相关业务,能取得较明显的效率提升。
    本篇围绕spark大数据背景下的推理,介绍一些优雅的使用方式。

    spark适用场景

    1. 大数据量自定义方法处理、类sql处理
    2. 传统机器学习方法(k-means、xgboost、lr…)
    3. 分布式深度学习推理
      在这里插入图片描述

    目前在10亿+数据量的推理场景中使用,需要用户自己实现批数据准备,基于RDD的方法完成模型推理输出。
    业务使用中的问题:

    1. 模型文件重复导入加载
    2. 自定义批数据准备,脱离深度学习dataloader框架,操作略显麻烦,有性能和内存oom等问题。

    实践

    spark加速深度学习推理

    spark加速深度学习推理,基本思路为:

    1. 开启不定量worker并行执行(cpu或gpu)推理任务
    2. 所有worker共享同一份模型参数
    3. 依赖spark pandas udf功能,方便并行处理 dataframe 数据
    4. 依赖深度学习框架,方便实现最优批数据划分
      下面以pytorch resnet 为实践demo

    加载&&广播模型参数

    广播模型参数,不仅能减少模型重复加载带来的流量和io,而且能加速推理前模型加载的速度。
    driver广播模型参数:

    # Load ResNet50 on driver node and broadcast its state.
    model_state = models.resnet50(pretrained=True).state_dict()
    bc_model_state = sc.broadcast(model_state)
    
    • 1
    • 2
    • 3

    worker读取模型参数:

    def get_model_for_eval():
      """Gets the broadcasted model."""
      model = models.resnet50(pretrained=True)
      model.load_state_dict(bc_model_state.value)
      model.eval()
      return model
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    实现基于dataframe的dataset

    目前主流的深度学习框架,dataset的实现大多基于本地存储,在读取分布式存储的场景 需要用户自定义实现。
    自定义实现有2个方法:

    1. 使用分布式存储的api接口读取文件内容
    2. dataset读取dataframe二进制文件内容

    方法一迭代与使用的存储类型会保持同步,且每次使用前需要明确使用的分布式存储,虽然实现方法容易但是使用流程略显麻烦。
    方法二不需要关心分布式存储类型,只要需要获取并解析spark dataframe列传入内容即可。

    本文采用方法二实现dataset:

    # 从二进制流中解析图片信息
    def pil_loader(binary_file):
        # open path as file to avoid ResourceWarning (https://github.com/python-pillow/Pillow/issues/835)
        image_io = io.BytesIO(binary_file)
        img = Image.open(image_io)
        return img.convert('RGB')
        
        
    # Create a custom PyTorch dataset class.
    class ImageDataset(Dataset):
        def __init__(self, data, transform=None):
            self.data = data
            self.transform = transform
    
        def __len__(self):
            return len(self.data)
    
        def __getitem__(self, index):
            image = pil_loader(self.data[index])
            if self.transform is not None:
                image = self.transform(image)
            return image
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22

    实现批量推理的pandas udf

    Pandas udf是基于RDD的一个低门槛高性能的实现方法,pandas udf能自定义处理逻辑,以列的方式操作datafrme内容。
    这是社区目前推荐的自定义处理方式。

    # Define the function for model inference.
    # PyArrow >= 1.0.0 must be installed;
    @pandas_udf(ArrayType(FloatType()))
    def predict_batch_udf(binaray_data: pd.Series) -> pd.Series:
        transform = transforms.Compose([
            transforms.Resize(224),
            transforms.CenterCrop(224),
            transforms.ToTensor(),
            transforms.Normalize(mean=[0.485, 0.456, 0.406],
                                 std=[0.229, 0.224, 0.225])
        ])
        images = ImageDataset(binaray_data, transform=transform)
        loader = torch.utils.data.DataLoader(images, batch_size=500, num_workers=8)
        model = get_model_for_eval()
        model.to(device)
        all_predictions = []
        with torch.no_grad():
            for batch in loader:
                predictions = list(model(batch.to(device)).cpu().numpy())
                for prediction in predictions:
                    all_predictions.append(prediction)
        return pd.Series(all_predictions)
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    # 调用pandas udf
    predictions_df = df. \
                     select(col("filename"), predict_batch_udf(col("data")).alias("prediction"))
    
    • 1
    • 2
    • 3

    更多代码细节:
    https://github.com/Crazybean-lwb/deeplearning-pyspark/blob/master/examples/pytorch-inference.py

    模型仓加速推理

    打通到模型仓mlflow功能:

    • 模型存储和版本管理
    • 便捷取用
    • 适用spark datarame更高阶的pandas udf实现

    在这里插入图片描述

    # Create the PySpark UDF
    import mlflow.pyfunc
    pyfunc_udf = mlflow.pyfunc.spark_udf(spark, model_uri=model_uri)
    
    # 调用pandas udf
    df = spark_df.withColumn("prediction", pyfunc_udf(struct([...])))
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    参考信息:

    1. pytorch分布式批量推理
    2. tensorflow分布式批量推理
    3. 模型仓mlflow协助分布式批量推理
  • 相关阅读:
    目录优先的图片库网站PiGallery2
    浅谈AI人体姿态识别技术的先进性及安防视频监控应用场景
    分享五款好用的PDF编辑工具
    vue入门到精通-第一个vue程序和基本语法组件
    day-3-2-3
    Window系统安装JDK8与Maven
    KubeEdge-Ianvs v0.2 发布:终身学习支持非结构化场景
    strlen函数详解
    IIoT系统架构
    java计算机毕业设计酒店预约及管理系统源码+mysql数据库+系统+lw文档+部署
  • 原文地址:https://blog.csdn.net/crazybean_lwb/article/details/131250802