เฮ้ สโนว์เฟลค ส่งอีเมลหาฉัน
Snowflake สามารถส่งอีเมลได้แล้ว ดังนั้นมาสร้างขั้นตอนที่จัดเก็บไว้ซึ่งจะส่งผลลัพธ์ของแบบสอบถามให้คุณทุกเช้า รวมถึงหากมีวันหยุดที่ไหนสักแห่งในโลกที่ส่งผลต่อตัวเลขของคุณ
Snowflake สามารถส่งอีเมลได้แล้ว
หากต้องการส่งการแจ้งเตือนทางอีเมลด้วย Snowflake ตอนนี้คุณต้องการการเรียก SQL เพียงครั้งเดียว:
call system$send_email(
'my_email_int',
'[email protected], [email protected], [email protected]',
'This is the subject',
'This is the body'
);
จะแจ้งผู้ใช้ใน Snowflake ได้อย่างไร? ❄️ สรุปเกล็ดหิมะ — การแจ้งเตือนทางอีเมล
เป้าหมายของโพสต์นี้คือการเชื่อมโยงฟังก์ชันนี้เข้ากับตัวกำหนดตารางเวลาและรหัสการจัดรูปแบบ ดังนั้นทุกเช้าตั้งแต่วันจันทร์ถึงวันศุกร์เราจะได้รับผลลัพธ์ของแบบสอบถาม SQL รวมถึงวันหยุดที่กำลังจะมาถึง ชอบสิ่งนี้:
ขั้นตอนที่ 1: พัฒนากระบวนงานที่เก็บไว้ซึ่งจัดรูปแบบและส่งอีเมลผลลัพธ์ของแบบสอบถามใดๆ
นี่คือรหัสที่ฉันเขียนในขณะที่ดีบักความสามารถของฉันในการส่งอีเมลผลลัพธ์การค้นหาที่มีรูปแบบที่ดี:
with pretty_email_results as procedure()
returns string
language python
packages = ('snowflake-snowpark-python', 'tabulate')
handler = 'x'
as
$$
def x(session):
printed = session.sql(
"select * from table(result_scan(last_query_id(-1)))"
).to_pandas().to_markdown()
session.call('system$send_email',
'my_email_int',
'[email protected]',
'Email Alert: Task A has finished.',
printed)
$$
call pretty_email_results();
- ตอนนี้ Snowflake มี " ขั้นตอนที่ไม่ระบุตัวตน " ที่ให้คุณกำหนดและเรียกใช้ในขั้นตอนเดียว คุณไม่จำเป็นต้องทำ
create or replace
ก่อนแล้วcall
หลัง — ซึ่งมีประโยชน์อย่างยิ่งในขณะที่ทำการดีบั๊ก tabulate
ฉันเลือกที่จะเขียนขั้นตอน นี้ใน Python เพื่อใช้ไลบรารี ไลบรารีนี้จะจัดรูปแบบผลลัพธ์ของคิวรี เพื่อให้ดูสวยงามในอีเมลขาออกของเรา- Anaconda มี
tabulate
ไลบรารี่อยู่แล้วใน Snowflake ดังนั้นเราจึงใช้มันกับ linepackages = (‘tabulate’)
. - Snowflake สามารถเข้าถึงผลลัพธ์ของ การ
result_scan(last_query_id(-1)
ค้นหาล่าสุดที่ดำเนินการ ซึ่งอนุญาตให้pretty_email_results()
ส่งอีเมลผลลัพธ์ที่เราได้รับในขั้นตอนก่อนหน้า - Snowpark ให้
session
วัตถุแก่เราในขั้นตอนการจัดเก็บของเรา ซึ่งเราสามารถใช้เพื่อดำเนินการสแกนผลลัพธ์ด้วยsession.sql()
. session.sql()
ให้ Snowpark DataFrame แก่เรา และ.to_pandas()
เมื่อเรียกใช้งานก็จะได้ Pandas DataFrame จากนั้น Pandas.to_markdown()
จะโทรหาtabulate
— และมันจะบ่น เว้นแต่คุณจะไม่ได้ร้องขอแพ็คเกจนี้อย่างชัดแจ้งด้วยpackages=(...)
.
เมื่อการดีบักแบบโต้ตอบของคุณเสร็จสิ้น คุณสามารถย้ายโค้ดไปยังขั้นตอนที่เก็บไว้อย่างถาวรได้:
create or replace procedure email_last_results(send_to string, subject string)
returns string
language python
runtime_version=3.8
packages = ('snowflake-snowpark-python', 'tabulate')
handler = 'x'
execute as caller
as
$$
import snowflake
def x(session, send_to, subject):
try:
body = session.sql(
"select * from table(result_scan(last_query_id(-1))) limit 100"
).to_pandas().to_markdown()
except snowflake.snowpark.exceptions.SnowparkSQLException as e:
body = '%s\n%s' % (type(e), e)
session.call('system$send_email',
'my_email_int',
send_to,
subject,
body)
return 'email sent:\n%s' % body
$$;
execute as caller
จำเป็นที่นี่เพื่อให้กระบวนงานที่เก็บไว้สามารถค้นหาประวัติการสืบค้นของผู้ใช้ที่เรียกได้runtime_version=3.8
จะต้องมีความชัดเจนในขั้นตอนการจัดเก็บอย่างถาวร- ฉันเพิ่มโค้ดเพื่อตรวจจับข้อยกเว้นและส่งอีเมลสวยๆ อยู่ดี
- ฉันย้ายหัวเรื่องและผู้รับไปที่อาร์กิวเมนต์ขั้นตอน
- ฉันเพิ่ม a
limit 100
เพื่อป้องกันการส่งอีเมลขนาด TB
call email_last_results('[email protected]', 'results from snowflake');
ตอนนี้ขั้นตอนการจัดเก็บ SQL อย่างง่ายสามารถเรียกใช้แบบสอบถามและขอให้ขั้นตอน Python ส่งอีเมลผลลัพธ์:
create or replace procedure run_query_and_email_it()
returns string
execute as caller
as
$$
begin
-- any query you'd like to run
select 'now' as what, to_char(current_timestamp) as value
union all
select 'upcoming', day || ' ' ||holiday
from table(fh_db.public.holidays('US', [year(current_date), year(current_date)+1]))
where day between current_date() and current_date()+60
;
-- call the stored procedure that formats and emails the results
call email_last_results('[email protected]', 'upcoming holidays');
return 'done';
end
$$
;
- หากต้องการดูตารางที่มีวันหยุดทั้งหมดในอีก 60 วันข้างหน้าจะเรียกUDTF ที่กำหนดไว้ก่อนหน้านี้ ของฉัน
holidays
สร้างวันหยุดทั้งหมดใน SQL — ด้วย Python UDTF
call run_query_and_email_it();
Snowflake มีตัวกำหนดเวลางานที่พร้อมสำหรับคำขอของคุณ:
create or replace task email_me_frequently
warehouse = 's'
schedule = 'using cron 0 8 * * 1-5 America/Los_Angeles'
as call run_query_and_email_it();
หมายเหตุบางส่วน:
- ฉันขอให้ใช้โกดังของฉันที่ชื่อว่า “s” Snowflake ยังมีServerless Tasksแต่งานเหล่านั้นไม่ได้เรียกใช้กระบวนงานที่เก็บไว้ของ Python
- ฉันต้องแก้ไข
holidays()
UDF ของฉันโดยย้ายการอ้างอิง Python.zip
จากส เตจผู้ใช้ไปยังสเตจ ที่มีชื่อ เมื่อมันอยู่ในขั้นตอนผู้ใช้ของฉัน งานก็แสดงข้อผิดRemote file ‘holidays.zip’ was not found
พลาด - เปลี่ยนกำหนดการเป็น every
1 minute
หากคุณต้องการดีบัก
alter task email_me_frequently resume;
select *
from table(information_schema.task_history(
scheduled_time_range_start=>dateadd('hour',-48,current_timestamp()),
result_limit => 100));
Images generated by AI
ไม่ต้องกังวลมากเกินไปเกี่ยวกับการสร้างระบบแจ้งเตือน — Snowflake กำลังพัฒนาสิ่งนี้อยู่ ( ขณะนี้อยู่ในการแสดงตัวอย่างแบบส่วนตัว ):
CREATE ALERT Warehouse_Credit_Usage_Alert
WAREHOUSE = my_warehouse
SCHEDULE = ‘USING CRON 0 7 * * *UTC’ // everyday at 7 am
IF (EXISTS (SELECT
Warehouse_name,
SUM(CREDITS_USED) AS credits
FROM snowflake.account_usage.warehouse_metering_history
// aggregate warehouse Credit_used for the past 24 hours
WHERE datediff(hour, start_time, CURRENT_TIMESTAMP ())<=24
GROUP BY 1
HAVING credits > 10
ORDER BY 2 DESC))
THEN system$send_email (
‘My_email_notification_integration’,
‘admin1@company,com, [email protected]’,
‘Email Alert: Excessive warehouse usage!’,
‘Warehouse usage exceeds 10 credits in the past 24 hours’
)
- ลอง ใช้บัญชี ทดลองใช้ฟรีของ Snowflake — คุณต้องการเพียงที่อยู่อีเมลเพื่อเริ่มต้นใช้งาน
- เล่นกับตัวเลือกการจัดรูปแบบที่
tabulate
มีให้