介绍 Amazon Timestream UNLOAD 语句:导出时间序列数据以获取更多洞察 数据库

2026-01-27 12:27:30 33

Amazon Timestream UNLOAD语句介绍:导出时间序列数据

关键要点

Amazon Timestream现在支持UNLOAD语句,可将时间序列数据安全且经济高效地导出到Amazon S3,以便进行额外分析。UNLOAD语句可帮助不同行业的用户如医疗、供应链和电子商务获取更深入的数据洞察。数据导出时可选择多种格式和选项,如列分区、压缩和加密,方便后续处理和分析。

在077 NOV 2023,由Shravanthi Rajagopal和Praneeth Kavuri撰写,文章详述了Amazon Timestream提供的UNLOAD语句。这项新功能使得用户能够轻松导出时间序列数据,以便进行更全面的数据分析和展现。

Amazon Timestream是一种完全托管、可扩展的无服务器时间序列数据库服务,便于用户每天存储和分析数万亿个事件。各行各业的客户因为Timestream的实时洞察能力、对关键业务应用的监控能力以及对海量实时事件的分析能力,而纷纷选择使用它。在此过程中,Timestream能根据工作负载自动扩展,且无需管理底层基础设施,从而简化了数据处理的复杂性。

许多Timestream用户希望通过将时间序列数据应用于其他上下文中来获取额外的价值,比如将数据添加到数据湖、训练机器学习模型以进行预测,或使用其他AWS或第三方服务来丰富数据。虽然该过程很耗时,因为它需要复杂的自定义解决方案。为了满足这些需求,我们高兴地介绍了UNLOAD语句,这是一种构建AI/ML管道及简化抽取转换加载ETL过程的安全经济的方法。

本文中,我们将展示如何使用UNLOAD语句将时间序列数据从Timestream导出到Amazon简单存储服务S3。

无论您是在预测气候趋势的气候研究者、监控患者健康的医疗提供者、监督生产的制造工程师、优化运营的供应链经理,还是追踪销售的电子商务经理,您都可以利用新的UNLOAD语句,从时间序列数据中获取更大的价值。

以下是UNLOAD语句助力的几个例子场景:

行业应用情境医疗分析医疗组织监测患者的健康指标,生成大量时间序列数据。可使用Timestream实时跟踪患者健康,导出数据至数据湖以进行丰富和进一步分析。供应链分析供应链分析师利用Timestream跟踪库存水平、交货时间等,以优化供应链。现可使用UNLOAD语句将数据导出至Amazon S3进行预测建模。电商分析电商经理使用Timestream跟踪电商网站的流量来源和销售数据,通过UNLOAD语句导出数据至Amazon S3并与其他相关数据进行综合分析。

UNLOAD语句概述

在Timestream中,UNLOAD语句允许用户以安全且高效的方式将查询结果导出到Amazon S3。借助UNLOAD,用户可以将时间序列数据以Apache Parquet或逗号分隔值CSV格式导出到指定的S3桶中,从而提供灵活存储、组合和使用其他服务如Amazon Athena等分析时间序列数据的渠道。UNLOAD语句还支持使用Amazon S3管理密钥SSES3或AWS密钥管理服务SSEKMS管理密钥对导出的数据进行加密,并可选择压缩以防止未授权的数据访问并降低存储成本。此外,用户可以选择一个或多个列来分区导出的数据,使后续服务仅扫描与查询相关的数据,从而减少处理时间和成本。

UNLOAD语句的语法如下:

sqlUNLOAD (SELECT 语句)TO s3//bucketname/folderWITH ( option = 表达式 [ ] )

在这里,option的定义如下:

sql{ partitionedby = ARRAY[ colname[] ] format = [ { CSV PARQUET } ] compression = [ { GZIP NONE } ] encryption = [ { SSEKMS SSES3 } ] kmskey = ltstringgt fielddelimiter =ltcharactergt escapedby = ltcharactergt includeheader = [{true false}] maxfilesize = ltvaluegt}

您可以通过AWS管理控制台、AWS命令行界面AWS CLI或AWS SDK运行UNLOAD语句。

解决方案概述

本文将讨论从Timestream导出数据至Amazon S3并获取额外洞察的步骤和最佳实践,包括以下高层次步骤:

向Timestream输入示例数据。执行数据分析。使用UNLOAD语句将查询结果导出至Amazon S3。创建AWS Glue数据目录表。利用Athena获取额外的业务洞察。

以下图示展示了解决方案架构。

请注意,如果您选择在自己的环境中重现此解决方案,您将根据公共定价支付所用AWS资源的费用。

我们将通过一个使用Timestream跟踪电子商务网站指标的示例用例来演示解决方案。每当产品销售时,销售数据包括产品ID、销售数量、引导客户访问网站的渠道如社交媒体或自然搜索、交易的时间戳及其他相关细节都会被记录并输入到Timestream。我们已创建示例数据,这数据是使用Faker生成并为演示进行了清洗。

数据包含以下信息:channel、ipaddress、sessionid、userid、event、usergroup、currenttime、query、productid、product和quantity。每当搜索导致购买时,productid和quantity将被记录。在向Timestream输入数据时,我们使用了以下数据模型:

维度 使用了channel、ipaddress、sessionid、userid、event和usergroup。有关维度的更多信息,请参见Amazon Timestream概念。时间 使用了currenttime。请注意,示例可能含有过时的时间。本文提供的示例代码在输入时会将其更改为最近的时间戳。多测量记录 使用了query、productid、product和quantity。有关更多信息,请参见多测量记录。

准备工作

在按照本文进行操作之前,您需具备以下准备条件:

创建数据库和表所需这些权限以允许CRUD操作。插入记录所需这些权限以允许插入操作。运行UNLOAD查询所需这些前提条件以将数据写入Amazon S3。在运行代码块之前,需将适当的AWS账户凭证导出为环境变量。

向Timestream输入数据

您可以使用本文中的示例代码创建数据库和表,然后向Timestream输入电子商务网站的销售数据。请完成以下步骤:

设置Jupyter笔记本或您选择的集成开发环境IDE。以下代码为了说明目的拆分成多个部分,并使用Python 39版本。如果您打算使用相同的代码,请将代码块合并为一个程序,或使用Jupyter笔记本跟随示例。初始化您的Timestream客户端:

python import boto3 from botocoreconfig import Config

session = boto3Session()

writeclient = sessionclient(timestreamwrite config=Config(regionname= readtimeout=20 maxpoolconnections=5000 retries={maxattempts 10})) queryclient = sessionclient(timestreamquery config=Config(regionname=))

创建数据库:

python databasename = timestreamsampledatabase writeclientcreatedatabase(DatabaseName=databasename) print(数据库 [s] 创建成功。 databasename)

创建数据库后,您可以在Timestream控制台上查看到它。

创建表:

python tablename = timestreamsampleunloadtable retentionproperties = { MemoryStoreRetentionPeriodInHours 12 MagneticStoreRetentionPeriodInDays 7 } writeclientcreatetable(DatabaseName=databasename TableName=tablename RetentionProperties=retentionproperties) print(表 [s] 创建成功。 tablename)

该表现在可以在Timestream控制台中查看。

介绍 Amazon Timestream UNLOAD 语句:导出时间序列数据以获取更多洞察 数据库

向表中输入示例数据:

python def submitbatch(records n) try result = writeclientwriterecords(DatabaseName=databasename TableName=tablename Records=records CommonAttributes={}) if result and result[ResponseMetadata] print(已处理 [d] 条记录。WriteRecords 状态 [s] (n result[ResponseMetadata][HTTPStatusCode])) except Exception as err print(错误 err)

import csv import time

with open(Downloads/sampleunloadcsv r) as csvfile csvreader = csvreader(csvfile) records = [] currenttime = str(int(round(timetime() 1000))) headerrow = [] # 提取csv文件内容 rowcounter = 0 for i row in enumerate(csvreader 1) if len(row) == 0 continue # 跳过csv头部 if i == 1 headerrow = row continue rowcounter = 1 record = { Dimensions [ {Name headerrow[0] Value row[0]} {Name headerrow[1] Value row[1]} {Name headerrow[2] Value row[2]} {Name headerrow[3] Value row[3]} {Name headerrow[4] Value row[4]} {Name headerrow[5] Value row[5]} ] Time str(int(currenttime) (i 50)) #修改进入的时间为当前时间 } measurevalues = [] if row[7] measurevaluesappend({ Name headerrow[7] Value row[7] Type VARCHAR }) if row[8] measurevaluesappend({ Name headerrow[8] Value row[8] Type VARCHAR }) if row[9] measurevaluesappend({ Name headerrow[9] Value row[9] Type VARCHAR }) if row[10] measurevaluesappend({ Name headerrow[10] Value row[10] Type DOUBLE }) recordupdate({ MeasureName metrics MeasureValueType MULTI MeasureValues measurevalues }) recordsappend(record) if len(records) == 100 submitbatch(records rowcounter) records = [] if records submitbatch(records rowcounter) print(已输入 d 条记录 rowcounter)

输入后,您可以在Timestream控制台的查询编辑器中预览表的内容。

执行数据分析

在Timestream中,您可以对输入的数据进行实时分析。例如,您可以查询过去一天内每种产品的单位销售量、过去一周内通过社交媒体广告进入商店的客户数量、销售趋势、过去一小时的购买模式等等。

要找出过去24小时内各产品的销量,请使用以下查询:

全局加速器永久免费

sqlSELECT sum(quantity) as numberofunitssold product FROM timestreamsampledatabasetimestreamsampleunloadtable WHERE time between ago(1d) and now() GROUP BY product

将数据导出至Amazon S3

您可以使用UNLOAD语句将时间序列数据导出到Amazon S3以进行额外分析。在本例中,我们分析客户根据他们到达网站的渠道。您可以使用partitionedby子句来将渠道特定的数据导出到一个文件夹中。在本例中,我们使用Parquet格式导出数据:

sqlUNLOAD(SELECT userid ipaddress event sessionid measurename time query quantity productid channel FROM timestreamsampledatabasetimestreamsampleunloadtable WHERE time BETWEEN ago(2d) AND now()) TO s3//ltyourbucketnamegt/partitionbychannel WITH (format = PARQUET partitionedby = ARRAY[channel])

当您使用partitionedby子句时,partitionedby字段中使用的列必须与SELECT语句的最后几列相同,并以出现的顺序放入ARRAY值中。

运行包含UNLOAD语句的查询后,您可以在查询结果选项卡的导出至Amazon S3摘要部分查看详细信息。

查看Amazon S3中的results文件夹时,您会看到数据按渠道名称分区。

创建AWS Glue数据目录表

您可以创建AWS Glue爬虫,以扫描S3桶中的数据,推断模式,并为从Timestream导出的数据在AWS Glue数据目录中创建一个元数据表。假设您在AWS Glue中具有所需的权限,在本节中,我们将提供两种选项:分别为每个渠道创建一个元数据文件,或爬取整个results文件夹并自动检测分区。

选项1:为每个渠道单独创建一个AWS Glue元数据文件

如果您需要针对每个渠道进行不同的分析,并且使用partitionedby子句将时间序列数据按渠道分开,您可以为特定渠道生成AWS Glue数据目录。在本例中,我们为社交媒体渠道创建数据目录。请