企业文化

利用亚马逊位置服务和 AWS 分析服务从历史位置信息中获取洞察 大数据博客


利用 Amazon 位置服务和 AWS 分析服务获取历史位置数据洞见

文:Alan Peaty 和 Parag Srivastava 于 2024 年 3 月 13 日发表

主要要点

实时追踪资产的位置,为企业改进和未来变化提供依据。结合 Amazon 位置服务和 AWS 多项服务,构建高效的数据管道。使用 Amazon Athena 可进行地理空间数据查询,获取具体的行驶数据和可行的商业决策数据。

许多企业依赖于实体资产例如车辆来向最终客户提供服务。透过实时追踪这些资产并存储位置数据,资产拥有者能够获取有价值的洞见,以便持续提升业务并计划未来的变化。举例来说,一家配送公司拥有庞大的车队,可能需要了解当地政策变动如扩展超低排放区 ULEZ对其业务的影响。结合历史的车辆位置数据和来自其他来源的信息,公司能够制定实证方法以改善决策。例如,采购团队可利用此信息,有针对性地提前更换将受到政策影响的车辆。

开发者可以利用 Amazon 位置服务 将设备位置更新发布至 Amazon EventBridge,构建接近实时的数据管道,并将追踪的资产位置存储在 Amazon S3 中。此外,可以使用 AWS Lambda 将传入的位置数据与其他来源的数据进行丰富化,比如包含车辆维护详情的 Amazon DynamoDB 表。随后,数据分析师可利用 Amazon Athena 的地理空间查询能力来获取洞见,例如其车辆在扩大后的 ULEZ 办理区内运行的天数。因为不符合 ULEZ 排放标准的车辆在该区域内运行需支付每日费用,这样便可以利用位置数据及维护数据如车辆年龄、当前里程和当前排放标准来估算公司需支付的日费用。

本篇文章展示了如何利用 Amazon 位置服务、EventBridge、Lambda、Amazon Data Firehose 和 Amazon S3 构建一个位置感知的数据管道,并利用这些数据通过 AWS Glue 和 Athena 驱动有意义的洞见。

解决方案概述

这是一种完全无伺服器的解决方案,用于基于位置的资产管理。系统由以下几个接口组成:

IoT 或移动应用 移动应用或物联网IoT设备能够在公司车辆使用过程中进行追踪,并安全传输当前位置至 AWS 的数据接收层。此文不涉及接收方式。相反,我们的解决方案中的 Lambda 函数模拟车辆行驶并直接更新 Amazon 位置追踪器对象至随机位置。数据分析 商业分析师从多个数据源收集运营洞见,包括从车辆收集的位置信息。分析师正在寻找的问题例如:“某给定车辆在 proposed zone 内历史上花费了多少时间?如果去年政策已经实施,费用将是多么的高?”

以下图示展示了解决方案架构。

工作流程包含以下关键步骤:

利用亚马逊位置服务和 AWS 分析服务从历史位置信息中获取洞察 大数据博客使用 Amazon 位置服务的追踪功能来追踪车辆。通过 EventBridge 集成,过滤过的位置更新被发布至 EventBridge 事件总线。本解决方案使用 基于距离的 过滤来降低成本和抖动,只有当设备行驶距离超过 30 米984 英尺时,才会进行位置更新。Amazon 位置设备位置事件以 source [awsgeo] 和 detailtype [Location Device Position Event] 的形式到达 EventBridge 的 default 总线。创建一条规则将这些事件转发至下游的两个目标:一个 Lambda 函数和一个 Firehose 传输流。文章中介绍了根据目标上下两种不同模式,使数据提交到 S3 存储桶的不同方法:Lambda 函数 第一种方法使用 Lambda 函数来示范如何在数据管道中使用代码直接转换来自位置的数据。你可以更改 Lambda 函数,以从其他数据库获取附加的车辆信息例如,DynamoDB 表或客户关系管理系统来增强数据,然后将结果存储在 S3 存储桶中。在此模型中,对于每个传入事件都会调用 Lambda 函数。Firehose 传输流 第二种方法使用 Firehose 传输流来缓存和批量提交传入的位置更新,然后不经修改地将这些数据存储在 S3 存储桶中。此方法使用 GZIP 压缩来优化存储消耗和查询性能。此外,还可以使用 Data Firehose 的 数据转换 功能调用 Lambda 函数在批次中执行数据转换。AWS Glue 爬虫扫描两个 S3 存储桶的路径,根据推断的 schemas 填充 AWS Glue 数据库表,并使数据通过 AWS Glue 数据目录可用于其他分析应用程序。使用 Athena 对存储在 S3 存储桶中的位置数据进行地理空间查询。数据目录提供的元数据允许使用 Athena 的分析应用程序查找、读取和处理存储在 Amazon S3 中的位置数据。此解决方案还包括一个 Lambda 函数,该函数不断使用模拟的旅程数据更新 Amazon 位置追踪器。Lambda 函数根据设定的 EventBridge 规则以规则间隔触发。

你可以使用 AWS Samples GitHub 存储库 自行测试此解决方案。该存储库包含了试运行此解决方案所需的 AWS 无伺服器应用程序模型 (AWS SAM) 模板和 Lambda 代码。请参考 README 文件中的说明,了解如何配置和清除此解决方案。

某些屏幕截图中的视觉布局可能与您的 AWS 管理控制台 不同。

数据生成

在本节中,我们将讨论手动或自动生成旅程数据的步骤。

手动生成旅程数据

您可以使用 AWS 命令行介面 (AWS CLI) 命令 aws location batchupdatedeviceposition 手动更新设备位置。将 trackername、deviceid、Position 和 SampleTime 值替换为您自己的,并确保相距 30 米以上的连续更新才能使事件放置在 default EventBridge 事件总线上:

aws location batchupdatedeviceposition trackername lt trackernamegt updates [{DeviceId lt deviceidgt Position [lt longitudegt lt latitudegt] SampleTime lt YYYYMMDDThhmmssZgt}]

使用模拟器自动生成旅程数据

提供的 AWS CloudFormation 模板部署了一个 EventBridge 定时规则及一个应用的 Lambda 函数,模拟来自车辆的追踪更新。该规则默认启用,并按 SimulationIntervalMinutes CloudFormation 参数指定的频率运行。数据生成的 Lambda 函数更新 Amazon 位置追踪器,位置来源于车辆的基准位置随机偏移。

车辆名称和基准位置存储在 vehiclesjson 文件中。每个车辆的起始位置每天重置,基准位置设置为在特定日期内使其能够进出 ULEZ,提供现实的行程模拟。

您可以通过导航到 EventBridge 控制台的定时规则详细信息来 暂时禁用该规则。或者,将 GenerateDevicePositionsScheduleRule 资源的 State ENABLED 参数更改为 State DISABLED,在 templateyml 文件中进行修改。重新构建并重新部署 AWS SAM 模板使更改生效。

位置数据管道方法

本节中的配置由提供的 AWS SAM 模板自动部署。本节的信息旨在描述解决方案的相关部分。

Amazon 位置设备位置事件

Amazon 位置将设备位置更新事件以以下格式发送至 EventBridge:

json{ version0 idlt eventidgt detailtypeLocation Device Position Event sourceawsgeo accountlt accountnumbergt timelt YYYYMMDDThhmmssZgt regionlt regiongt resources[ arnawsgeolt regiongtlt accountnumbergttracker/lt trackernamegt ] detail{ EventTypeUPDATE TrackerNamelt trackernamegt DeviceIdlt deviceidgt SampleTimelt YYYYMMDDThhmmssZgt ReceivedTimelt YYYYMMDDThhmmsssssZgt Position[ ltlongitudegt ltlatitudegt ] }}

您可以选择性地指定一个 输入转换,在数据到达目标之前修改设备位置事件数据的格式和内容。

使用 Lambda 进行数据丰富

在此范例中,数据丰富通过调用 Lambda 函数实现。我们称此函数为 ProcessDevicePosition,并使用 Python 执行时。通过输入转换自定义转换,EventBridge 目标定义接收如下格式的事件数据:

json{ EventTypelt EventTypegt TrackerNamelt TrackerNamegt DeviceIdlt DeviceIdgt SampleTimelt SampleTimegt ReceivedTimelt ReceivedTimegt Position[lt Longitudegtlt Latitudegt]}

您可以根据下游业务逻辑处理事件的需要,进一步进行转变,例如将 Latitude 和 Longitude 数据重构为单独的键值对。

以下代码演示了 ProcessDevicePosition Lambda 函数的 Python 应用逻辑。为了简洁起见,省略了错误处理,完整代码可在 GitHub 存储库 中查看。

pythonimport jsonimport osimport uuidimport boto3

从 Lambda 函数中导入环境变数

bucketname = osenviron[S3BUCKETNAME]bucketprefix = osenviron[S3BUCKETLAMBDAPREFIX]

s3 = boto3client(s3)

def lambdahandler(event context) key = s/s/ssjson (bucketprefix event[DeviceId] event[SampleTime] str(uuiduuid4()) body = jsondumps(event separators=( )) bodyencoded = bodyencode(utf8) s3putobject(Bucket=bucketname Key=key Body=bodyencoded) return { statusCode 200 body success }

上述代码为 EventBridge 接收到的每个设备位置事件创建 S3 对象。该代码使用 DeviceId 作为前缀将对象写入存储桶。

您可以在上述 Lambda 函数代码中添加其他逻辑,以使用其他来源丰富事件数据。在 GitHub 存储库 中的示例演示了如何使用来自 DynamoDB 车辆维护表的数据丰富事件。

除先前提供的 AWS 身份和访问管理 (IAM) 权限外,ProcessDevicePosition 函数需要获得执行 S3 putobject 行为以及数据丰富逻辑所需的所有其他行为。解决方案所需的 IAM 权限记录在 templateyml 文件中。

json{ Version20121017 Statement[ { Action[ s3ListBucket ] Resource[ arnawss3lt S3BUCKETNAMEgt ] EffectAllow } { Action[ s3PutObject ] Resource[ arnawss3lt S3BUCKETNAMEgt/lt S3BUCKETLAMBDAPREFIXgt/ ] EffectAllow } ]}

使用 Amazon Data Firehose 建立数据管道

请按照以下步骤创建 Firehose 传输流:

在 Amazon Data Firehose 控制台上,选择导航面板中的 Firehose streams。选择 Create Firehose stream。对于 Source,选择 Direct PUT。对于 Destination,选择 Amazon S3。

对于 Firehose stream name,输入名称在此文章中为 ProcessDevicePositionFirehose。

使用有关位置数据存储的 S3 存储桶的详细信息配置目标设置,并设置分区策略:

飞鱼加速器app下载使用 lt S3BUCKETNAMEgt 和 lt S3BUCKETFIREHOSEPREFIXgt 来确定存储桶和对象前缀。使用 DeviceId 作为附加前缀,将对象写入存储桶。启用 Dynamic partitioning 和 New line delimiter,以确保根据 DeviceId 进行自动分区,并在传递给 Amazon S3 的对象中添加换行符。

这对于稍后由 AWS Glue 爬行数据以及 Athena 辨认单独记录是必需的。

创建 EventBridge 规则并附加目标

EventBridge 规则 ProcessDevicePosition