在077 NOV 2023,由Shravanthi Rajagopal和Praneeth Kavuri撰写,文章详述了Amazon Timestream提供的UNLOAD语句。这项新功能使得用户能够轻松导出时间序列数据,以便进行更全面的数据分析和展现。
Amazon Timestream是一种完全托管、可扩展的无服务器时间序列数据库服务,便于用户每天存储和分析数万亿个事件。各行各业的客户因为Timestream的实时洞察能力、对关键业务应用的监控能力以及对海量实时事件的分析能力,而纷纷选择使用它。在此过程中,Timestream能根据工作负载自动扩展,且无需管理底层基础设施,从而简化了数据处理的复杂性。
许多Timestream用户希望通过将时间序列数据应用于其他上下文中来获取额外的价值,比如将数据添加到数据湖、训练机器学习模型以进行预测,或使用其他AWS或第三方服务来丰富数据。虽然该过程很耗时,因为它需要复杂的自定义解决方案。为了满足这些需求,我们高兴地介绍了UNLOAD语句,这是一种构建AI/ML管道及简化抽取转换加载ETL过程的安全经济的方法。
本文中,我们将展示如何使用UNLOAD语句将时间序列数据从Timestream导出到Amazon简单存储服务S3。
无论您是在预测气候趋势的气候研究者、监控患者健康的医疗提供者、监督生产的制造工程师、优化运营的供应链经理,还是追踪销售的电子商务经理,您都可以利用新的UNLOAD语句,从时间序列数据中获取更大的价值。
以下是UNLOAD语句助力的几个例子场景:
行业应用情境医疗分析医疗组织监测患者的健康指标,生成大量时间序列数据。可使用Timestream实时跟踪患者健康,导出数据至数据湖以进行丰富和进一步分析。供应链分析供应链分析师利用Timestream跟踪库存水平、交货时间等,以优化供应链。现可使用UNLOAD语句将数据导出至Amazon S3进行预测建模。电商分析电商经理使用Timestream跟踪电商网站的流量来源和销售数据,通过UNLOAD语句导出数据至Amazon S3并与其他相关数据进行综合分析。在Timestream中,UNLOAD语句允许用户以安全且高效的方式将查询结果导出到Amazon S3。借助UNLOAD,用户可以将时间序列数据以Apache Parquet或逗号分隔值CSV格式导出到指定的S3桶中,从而提供灵活存储、组合和使用其他服务如Amazon Athena等分析时间序列数据的渠道。UNLOAD语句还支持使用Amazon S3管理密钥SSES3或AWS密钥管理服务SSEKMS管理密钥对导出的数据进行加密,并可选择压缩以防止未授权的数据访问并降低存储成本。此外,用户可以选择一个或多个列来分区导出的数据,使后续服务仅扫描与查询相关的数据,从而减少处理时间和成本。
UNLOAD语句的语法如下:
sqlUNLOAD (SELECT 语句)TO s3//bucketname/folderWITH ( option = 表达式 [ ] )
在这里,option的定义如下:
sql{ partitionedby = ARRAY[ colname[] ] format = [ { CSV PARQUET } ] compression = [ { GZIP NONE } ] encryption = [ { SSEKMS SSES3 } ] kmskey = ltstringgt fielddelimiter =ltcharactergt escapedby = ltcharactergt includeheader = [{true false}] maxfilesize = ltvaluegt}
您可以通过AWS管理控制台、AWS命令行界面AWS CLI或AWS SDK运行UNLOAD语句。
本文将讨论从Timestream导出数据至Amazon S3并获取额外洞察的步骤和最佳实践,包括以下高层次步骤:
向Timestream输入示例数据。执行数据分析。使用UNLOAD语句将查询结果导出至Amazon S3。创建AWS Glue数据目录表。利用Athena获取额外的业务洞察。以下图示展示了解决方案架构。
请注意,如果您选择在自己的环境中重现此解决方案,您将根据公共定价支付所用AWS资源的费用。
我们将通过一个使用Timestream跟踪电子商务网站指标的示例用例来演示解决方案。每当产品销售时,销售数据包括产品ID、销售数量、引导客户访问网站的渠道如社交媒体或自然搜索、交易的时间戳及其他相关细节都会被记录并输入到Timestream。我们已创建示例数据,这数据是使用Faker生成并为演示进行了清洗。
数据包含以下信息:channel、ipaddress、sessionid、userid、event、usergroup、currenttime、query、productid、product和quantity。每当搜索导致购买时,productid和quantity将被记录。在向Timestream输入数据时,我们使用了以下数据模型:
维度 使用了channel、ipaddress、sessionid、userid、event和usergroup。有关维度的更多信息,请参见Amazon Timestream概念。时间 使用了currenttime。请注意,示例可能含有过时的时间。本文提供的示例代码在输入时会将其更改为最近的时间戳。多测量记录 使用了query、productid、product和quantity。有关更多信息,请参见多测量记录。在按照本文进行操作之前,您需具备以下准备条件:
创建数据库和表所需这些权限以允许CRUD操作。插入记录所需这些权限以允许插入操作。运行UNLOAD查询所需这些前提条件以将数据写入Amazon S3。在运行代码块之前,需将适当的AWS账户凭证导出为环境变量。您可以使用本文中的示例代码创建数据库和表,然后向Timestream输入电子商务网站的销售数据。请完成以下步骤:
设置Jupyter笔记本或您选择的集成开发环境IDE。以下代码为了说明目的拆分成多个部分,并使用Python 39版本。如果您打算使用相同的代码,请将代码块合并为一个程序,或使用Jupyter笔记本跟随示例。初始化您的Timestream客户端:python import boto3 from botocoreconfig import Config
session = boto3Session()
writeclient = sessionclient(timestreamwrite config=Config(regionname= readtimeout=20 maxpoolconnections=5000 retries={maxattempts 10})) queryclient = sessionclient(timestreamquery config=Config(regionname=))
创建数据库:python databasename = timestreamsampledatabase writeclientcreatedatabase(DatabaseName=databasename) print(数据库 [s] 创建成功。 databasename)
创建数据库后,您可以在Timestream控制台上查看到它。
创建表:python tablename = timestreamsampleunloadtable retentionproperties = { MemoryStoreRetentionPeriodInHours 12 MagneticStoreRetentionPeriodInDays 7 } writeclientcreatetable(DatabaseName=databasename TableName=tablename RetentionProperties=retentionproperties) print(表 [s] 创建成功。 tablename)
该表现在可以在Timestream控制台中查看。

python def submitbatch(records n) try result = writeclientwriterecords(DatabaseName=databasename TableName=tablename Records=records CommonAttributes={}) if result and result[ResponseMetadata] print(已处理 [d] 条记录。WriteRecords 状态 [s] (n result[ResponseMetadata][HTTPStatusCode])) except Exception as err print(错误 err)
import csv import time
with open(Downloads/sampleunloadcsv r) as csvfile csvreader = csvreader(csvfile) records = [] currenttime = str(int(round(timetime() 1000))) headerrow = [] # 提取csv文件内容 rowcounter = 0 for i row in enumerate(csvreader 1) if len(row) == 0 continue # 跳过csv头部 if i == 1 headerrow = row continue rowcounter = 1 record = { Dimensions [ {Name headerrow[0] Value row[0]} {Name headerrow[1] Value row[1]} {Name headerrow[2] Value row[2]} {Name headerrow[3] Value row[3]} {Name headerrow[4] Value row[4]} {Name headerrow[5] Value row[5]} ] Time str(int(currenttime) (i 50)) #修改进入的时间为当前时间 } measurevalues = [] if row[7] measurevaluesappend({ Name headerrow[7] Value row[7] Type VARCHAR }) if row[8] measurevaluesappend({ Name headerrow[8] Value row[8] Type VARCHAR }) if row[9] measurevaluesappend({ Name headerrow[9] Value row[9] Type VARCHAR }) if row[10] measurevaluesappend({ Name headerrow[10] Value row[10] Type DOUBLE }) recordupdate({ MeasureName metrics MeasureValueType MULTI MeasureValues measurevalues }) recordsappend(record) if len(records) == 100 submitbatch(records rowcounter) records = [] if records submitbatch(records rowcounter) print(已输入 d 条记录 rowcounter)
输入后,您可以在Timestream控制台的查询编辑器中预览表的内容。
在Timestream中,您可以对输入的数据进行实时分析。例如,您可以查询过去一天内每种产品的单位销售量、过去一周内通过社交媒体广告进入商店的客户数量、销售趋势、过去一小时的购买模式等等。
要找出过去24小时内各产品的销量,请使用以下查询:
全局加速器永久免费sqlSELECT sum(quantity) as numberofunitssold product FROM timestreamsampledatabasetimestreamsampleunloadtable WHERE time between ago(1d) and now() GROUP BY product
您可以使用UNLOAD语句将时间序列数据导出到Amazon S3以进行额外分析。在本例中,我们分析客户根据他们到达网站的渠道。您可以使用partitionedby子句来将渠道特定的数据导出到一个文件夹中。在本例中,我们使用Parquet格式导出数据:
sqlUNLOAD(SELECT userid ipaddress event sessionid measurename time query quantity productid channel FROM timestreamsampledatabasetimestreamsampleunloadtable WHERE time BETWEEN ago(2d) AND now()) TO s3//ltyourbucketnamegt/partitionbychannel WITH (format = PARQUET partitionedby = ARRAY[channel])
当您使用partitionedby子句时,partitionedby字段中使用的列必须与SELECT语句的最后几列相同,并以出现的顺序放入ARRAY值中。
运行包含UNLOAD语句的查询后,您可以在查询结果选项卡的导出至Amazon S3摘要部分查看详细信息。
查看Amazon S3中的results文件夹时,您会看到数据按渠道名称分区。
您可以创建AWS Glue爬虫,以扫描S3桶中的数据,推断模式,并为从Timestream导出的数据在AWS Glue数据目录中创建一个元数据表。假设您在AWS Glue中具有所需的权限,在本节中,我们将提供两种选项:分别为每个渠道创建一个元数据文件,或爬取整个results文件夹并自动检测分区。
如果您需要针对每个渠道进行不同的分析,并且使用partitionedby子句将时间序列数据按渠道分开,您可以为特定渠道生成AWS Glue数据目录。在本例中,我们为社交媒体渠道创建数据目录。请
01-27
通过亚马逊 OpenSearch 服务将长期日志费用降低 4800 大数据博客
降低长期日志开销达4800 的亚马逊 OpenSearch 服务关键要点使用 Amazon OpenSearch Service 存储日志数据,能够显著降低存储成本。文章讨论了 OpenSearch ...
01-27
如何降低 Amazon Kinesis 视频流的延迟 第 1 部分 物联网官方博客
降低 Amazon Kinesis Video Streams 延迟的方法 第 1 部分作者:Dean Colcott,发表于2022年12月21日,来源于 文章类型、最佳实践、专家 (400)、Ki...
01-27
在您的生成式人工智能应用中使用网页搜索 API 和 Amazon Bedrock 代理集成动态网页内
在生成型 AI 应用中集成动态网页内容关键要点功能增强:通过将 Amazon Bedrock Agent 与网页搜索 API 集成,提升聊天机器人实时搜索和动态信息检索能力。简化操作:开发者可借助 A...
01-27
在 Amazon QuickSight 中使用跨表筛选器和控件 商业智能博客
在 Amazon QuickSight 中构建跨表筛选器和控件关键要点Amazon QuickSight 通过引入跨表筛选器和控件功能,简化了分析过程。新功能允许用户跨多个表创建、删除和编辑筛选器。控...