AWS Redshift ETL的几个性能最佳实践
更新:HHH   时间:2023-1-7


Amazon Redshift 是一个支持SQL查询的、快速、可扩展的列式存储数据库,它支持PB级的数量查询,是适用于企业级的数据仓库。同时Redshift支持大规模并发查询、支持结果集缓存,响应查询时间最快至亚秒,比起其他数据仓库快将近十倍。借助 Redshift,您的等待时间更少,可将更多时间用于获取数据见解。

ETL在计算机领域是一个很流行的概念,意指将数据从一个或多个源头复制到目标系统的一个过程,其中包含三个步骤:
1,Extract 从数据源中选择/提取需要导出的数据
2,Transform 将导出的数据根据业务需要进行必要的格式/表现形式上的转换
3,Load 将转换后的数据导入目标系统

在使用Redshift之前需要将数据导入Redshift,即Redshift的ETL。例如数据库的迁移,将旧数据库中的数据转移到Redshift等等。
本文旨在分享我们bosicloud在日常工作中关于Redshift ETL方面的一些技巧及建议:

1,使用COPY命令将多个、大小相当的文件加载到Redshift
Amazon Redshift是一个MPP数据库,即大规模并行处理数据库,Redshift的背后是一个EC2集群,每个计算节点(ec2)进一步细分为slice,所有slice平分计算节点的处理能力。 每个节点的slice数量取决于群集的节点类型。 例如,每个DS2.XLARGE计算节点都有两个slice,而每个DS2.8XLARGE计算节点有16个slice。

进行加载数据工作时,最好的情况是整个工作量平分给所有的计算节点(EC2 node)的slice。当只加载一个大文件,或者加载多个大小差异较大的文件时,都可能导致计算节点工作量分配不均等,进而导致整个Redshift加载数据时性能低下。例如,我们Redshift有两个计算节点,在加载一个大文件时每个节点的工作负载如下:

从上图可以看到,加载的任务落在了compute-0头上,而compute-1则是闲置的,这是一个木桶原理,Redshift最终花费的时间等于工作时间最长的那个计算节点。
所以我们最好将大文件切割为多个大小相同的小文件,并且文件总数量正好是计算节点的整倍数,从而每个计算节点可以分到数目相同的小文件。另外,我们还建议将这些小文件进行压缩,例如gzip, lzop, or bzip2因为Redshift加载及存储数据时都支持这几种压缩格式。显然,压缩后的数据更小了,加载的工作量也就更小了。

2,使用workload management合理调整Redshift queue
Redshift通过workload management(WLM)管理着多个queue,用户提交SQL查询任务到Redshift时,SQL会根据提交者所在的group被分派到group对应的queue排队等候执行。Redshift的内存及计算能力被分成许多个单元/单位,一个slot代表一个单元的内存及计算能力,一个slot同一时间可以执行一个SQL查询任务,不同的queue拥有不同数量的slot,slot的数量决定该queue能够同时并发执行多少个SQL查询任务。因为ETL往往伴随着许多COMMIT 操作,而COMMIT 都很消耗计算资源。为了进行ETL的同时不影响普通用户提交SQL query,我们bosicloud建议将ETL和普通用户提交的SQL query分发到不同的queue中。否则普通用户query可能由于等待ETL COMMIT导致不能及时响应。

另外,为了加快Redshift的COPY ETL过程,我们还可以通过wlm_query_slot_count参数调整ETL queue的slot数量,从而增加queue的内存、计算能力及SQL查询并发数量。

                                             Redshift的WLM设置界面

3,使用”BEGIN…COMMIT”减少COMMIT次数前面我们提到ELT是一个多步骤的任务,每个步骤最后往往需要执行一个COMMIT,而COMMIT又是一个昂贵的操作。所以我们bosicloud建议使用BEGIN…END 将能够合并的多个步骤尽量合并为一个步骤并只执行一次COMMIT,例如:
Begin
CREATE temporary staging_table;
INSERT INTO staging_table SELECT .. FROM source;
DELETE FROM table1 WHERE ???=???;
INSERT INTO table1 SELECT .. FROM staging_table;
DELETE FROM table2 WHERE ???=???;
INSERT INTO table2 SELECT .. FROM staging_table;
Commit

4,使用Redshift Spectrum for ad-hoc查询
在以往,我们可以会为了做ad-hoc查询而将S3上的数据ETL加载到Redshift进行查询。如果仅为了偶尔的一两次查询而进行ETL,这个ETL将显得非常昂贵、不划算。别忘了AWS最近推出了Redshift Spectrum新功能,即您可以直接利用Redshift Spectrum查询S3上的数据而无需将数据加载到Redshift中,虽然Specturm查询相比普通Redshift查询较慢,但比起ETL来说查询速度仍然是大大提升了。

5,关于ETL健康检查的SQL实用脚本:
 返回过去1天内queue的统计信息,例如最大队列长度和队列时间
select startqueue,node, datediff(ms,startqueue,startwork) as queue_time, datediff(ms, startwork, endtime) as commit_time, queuelen
from stl_commit_stats
where startqueue >= dateadd(day, -1, current_Date)
order by queuelen desc , queue_time desc;
 返回一个星期内所执行的COPY的相关信息,如COPY的开始时间(Starttime),所在queue的ID(query),SQL语句(querytxt),COPY的文件数量(n_files)及文件大小(size_mb)等等:
select q.starttime, s.query, substring(q.querytxt,1,120) as querytxt,
s.n_files, size_mb, s.time_seconds,
s.size_mb/decode(s.time_seconds,0,1,s.time_seconds) as mb_per_s
from (select query, count() as n_files,
sum(transfer_size/(1024
1024)) as size_MB, (max(end_Time) -
min(start_Time))/(1000000) as time_seconds , max(end_time) as end_time
from stl_s3client where http_method = 'GET' and query > 0
and transfer_time > 0 group by query ) as s
LEFT JOIN stl_Query as q on q.query = s.query
where s.end_Time >= dateadd(day, -7, current_Date)
order by s.time_Seconds desc, size_mb desc, s.end_time desc
limit 50;

 建立view视图查看每个表空间使用情况,请考虑将空间增长较快的表的内容unload到S3.

CREATE OR REPLACE VIEW admin.v_space_used_per_tbl
AS with info_table as ( SELECT TRIM(pgdb.datname) AS dbase_name
,TRIM(pgn.nspname) as schemaname
,TRIM(pgc.relname) AS tablename
,id AS tbl_oid
,b.mbytes AS megabytes
,CASE WHEN pgc.reldiststyle = 8
THEN a.rows_all_dist
ELSE a.rows END AS rowcount
,CASE WHEN pgc.reldiststyle = 8
THEN a.unsorted_rows_all_dist
ELSE a.unsorted_rows END AS unsorted_rowcount
,CASE WHEN pgc.reldiststyle = 8
THEN decode( det.n_sortkeys,0, NULL,DECODE( a.rows_all_dist,0,0, (a.unsorted_rows_all_dist::DECIMAL(32)/a.rows_all_dist)100))::DECIMAL(20,2)
ELSE decode( det.n_sortkeys,0, NULL,DECODE( a.rows,0,0, (a.unsorted_rows::DECIMAL(32)/a.rows)
100))::DECIMAL(20,2) END
AS pct_unsorted
FROM ( SELECT
db_id
,id
,name
,MAX(ROWS) AS rows_all_dist
,MAX(ROWS) - MAX(sorted_rows) AS unsorted_rows_all_dist
,SUM(rows) AS rows
,SUM(rows)-SUM(sorted_rows) AS unsorted_rows
FROM stv_tbl_perm
GROUP BY db_id, id, name
) AS a
INNER JOIN
pg_class AS pgc
ON pgc.oid = a.id
INNER JOIN
pg_namespace AS pgn
ON pgn.oid = pgc.relnamespace
INNER JOIN
pg_database AS pgdb
ON pgdb.oid = a.db_id
INNER JOIN (SELECT attrelid,
MIN(CASE attisdistkey WHEN 't' THEN attname ELSE NULL END) AS "distkey",
MIN(CASE attsortkeyord WHEN 1 THEN attname ELSE NULL END) AS head_sort,
MAX(attsortkeyord) AS n_sortkeys,
MAX(attencodingtype) AS max_enc,
SUM(case when attencodingtype <> 0 then 1 else 0 end)::DECIMAL(20,3)/COUNT(attencodingtype)::DECIMAL(20,3) 100.00 as pct_enc
FROM pg_attribute
GROUP BY 1) AS det ON det.attrelid = a.id
LEFT OUTER JOIN
( SELECT
tbl
,COUNT(
) AS mbytes
FROM stv_blocklist
GROUP BY tbl
) AS b
ON a.id=b.tbl
WHERE pgc.relowner > 1)
select info.*
,CASE WHEN info.rowcount = 0 THEN 'n/a'
WHEN info.pct_unsorted >= 20 THEN 'VACUUM SORT recommended'
ELSE 'n/a'
END AS recommendation
from info_table info;

 找出本周内最费时间的前50个SQL查询(多个相同的sql查询时间合并计算)
-- query runtimes
select trim(database) as DB, count(query) as n_qry, max(substring (qrytext,1,80)) as qrytext, min(run_seconds) as "min" , max(run_seconds) as "max", avg(run_seconds) as "avg", sum(run_seconds) as total, max(query) as max_query_id,
max(starttime)::date as last_run, aborted,
listagg(event, ', ') within group (order by query) as events
from (
select userid, label, stl_query.query, trim(database) as database, trim(querytxt) as qrytext, md5(trim(querytxt)) as qry_md5, starttime, endtime, datediff(seconds, starttime,endtime)::numeric(12,2) as run_seconds,
aborted, decode(alrt.event,'Very selective query filter','Filter','Scanned a large number of deleted rows','Deleted','Nested Loop Join in the query plan','Nested Loop','Distributed a large number of rows across the network','Distributed','Broadcasted a large number of rows across the network','Broadcast','Missing query planner statistics','Stats',alrt.event) as event
from stl_query
left outer join ( select query, trim(split_part(event,':',1)) as event from STL_ALERT_EVENT_LOG where event_time >= dateadd(day, -7, current_Date) group by query, trim(split_part(event,':',1)) ) as alrt on alrt.query = stl_query.query
where userid <> 1
-- and (querytxt like 'SELECT%' or querytxt like 'select%' )
-- and database = ''
and starttime >= dateadd(day, -7, current_Date)
)
group by database, label, qry_md5, aborted
order by total desc limit 50;

【关于博思云为】
作为一家专业的云计算服务型企业,博思云为专为客户提供 AWS 上的运营服务:包括架构咨询服务、迁移服务、云安全集成服务、混合云管理服务、大数据服务以及 DevOps 服务。目前,博思云为在大数据、DevOps、架构、数据库以及操作系统等都已取得厂商认证,在上海、南京、杭州、武汉等地设有分公司。为创新服务模式、引领 IT 服务业的发展,博思云为将持续投入资源开展智能混合云管理平台、图数据库的研发等。

返回云计算教程...