Эй, Снежинка, напиши мне письмо.
Теперь Snowflake может отправлять электронные письма, поэтому давайте создадим хранимую процедуру, которая будет отправлять вам результаты запроса каждое утро, в том числе, если где-то в мире есть праздник, который влияет на ваши показатели.
Снежинка теперь может отправлять электронные письма
Чтобы отправить уведомление по электронной почте с помощью Snowflake, вам теперь нужен только один вызов SQL:
call system$send_email(
'my_email_int',
'[email protected], [email protected], [email protected]',
'This is the subject',
'This is the body'
);
Как уведомить пользователей в Snowflake? ❄️ Снежинка в двух словах — уведомления по электронной почте
Цель этого поста — связать эту функциональность с планировщиком и некоторым кодом форматирования, чтобы каждое утро с понедельника по пятницу мы могли получать результаты SQL-запроса, включая любые предстоящие праздники. Как этот:

Шаг 1. Разработайте хранимую процедуру, которая форматирует и отправляет по электронной почте результат любого запроса.
Это код, который я написал во время отладки своих способностей отправлять по электронной почте хорошо отформатированный результат запроса:
with pretty_email_results as procedure()
returns string
language python
packages = ('snowflake-snowpark-python', 'tabulate')
handler = 'x'
as
$$
def x(session):
printed = session.sql(
"select * from table(result_scan(last_query_id(-1)))"
).to_pandas().to_markdown()
session.call('system$send_email',
'my_email_int',
'[email protected]',
'Email Alert: Task A has finished.',
printed)
$$
call pretty_email_results();
- Теперь в Snowflake есть « Анонимные процедуры », которые позволяют определять и вызывать их за один шаг. Вам не нужно
create or replace
сначала иcall
позже — что особенно полезно при отладке. - Я решил написать эту процедуру на Python, чтобы использовать библиотеку
tabulate
. Эта библиотека будет форматировать результаты запроса, чтобы они красиво выглядели в нашей исходящей электронной почте. - Библиотека
tabulate
уже предоставлена Anaconda в Snowflake, поэтому мы можем использовать ее со строкойpackages = (‘tabulate’)
. - С помощью
result_scan(last_query_id(-1)
Snowflake можно получить доступ к результатам последнего выполненного запроса, что позволяетpretty_email_results()
отправить по электронной почте любые результаты, полученные на предыдущем шаге. - Snowpark предоставляет нам
session
объект в нашей хранимой процедуре, которую мы можем использовать для выполнения сканирования результатов с помощьюsession.sql()
. session.sql()
дает нам кадр данных Snowpark, а вызов.to_pandas()
его — получение кадра данных usPandas. Затем Pandas'.to_markdown()
получает вызовtabulate
— и он будет жаловаться, если вы явно не запросите этот пакет с помощьюpackages=(...)
.
После завершения интерактивной отладки вы можете переместить код в постоянную хранимую процедуру:
create or replace procedure email_last_results(send_to string, subject string)
returns string
language python
runtime_version=3.8
packages = ('snowflake-snowpark-python', 'tabulate')
handler = 'x'
execute as caller
as
$$
import snowflake
def x(session, send_to, subject):
try:
body = session.sql(
"select * from table(result_scan(last_query_id(-1))) limit 100"
).to_pandas().to_markdown()
except snowflake.snowpark.exceptions.SnowparkSQLException as e:
body = '%s\n%s' % (type(e), e)
session.call('system$send_email',
'my_email_int',
send_to,
subject,
body)
return 'email sent:\n%s' % body
$$;
execute as caller
здесь необходимо, чтобы хранимая процедура могла найти историю запросов пользователя, вызвавшего ее.runtime_version=3.8
должен быть явным в постоянной хранимой процедуре.- Я добавил код, чтобы ловить исключения и в любом случае отправлять красивое электронное письмо.
- Я переместил тему и получателей в аргументы процедуры.
- Я добавил,
limit 100
чтобы предотвратить отправку электронных писем размером в ТБ.
call email_last_results('[email protected]', 'results from snowflake');
Теперь простая хранимая процедура SQL может выполнить запрос и попросить нашу процедуру Python отправить результаты по электронной почте:
create or replace procedure run_query_and_email_it()
returns string
execute as caller
as
$$
begin
-- any query you'd like to run
select 'now' as what, to_char(current_timestamp) as value
union all
select 'upcoming', day || ' ' ||holiday
from table(fh_db.public.holidays('US', [year(current_date), year(current_date)+1]))
where day between current_date() and current_date()+60
;
-- call the stored procedure that formats and emails the results
call email_last_results('[email protected]', 'upcoming holidays');
return 'done';
end
$$
;
- Чтобы получить таблицу со всеми праздниками на следующие 60 дней , он вызывает мою ранее определенную
holidays
UDTF. Генерация всех праздников в SQL — с Python UDTF
call run_query_and_email_it();
У Snowflake есть планировщик задач, готовый к вашим запросам:
create or replace task email_me_frequently
warehouse = 's'
schedule = 'using cron 0 8 * * 1-5 America/Los_Angeles'
as call run_query_and_email_it();
Некоторые примечания:
- Я попросил его использовать мой склад под названием «s». Snowflake также предлагает бессерверные задачи , но они не запускают хранимые процедуры Python.
- Мне пришлось модифицировать свою
holidays()
UDF, переместив.zip
зависимости Python с пользовательского этапа на именованный этап . Когда он жил внутри моей пользовательской стадии, задача выдавала ошибкуRemote file ‘holidays.zip’ was not found
. - Измените расписание на каждое
1 minute
, если вам нужно выполнить отладку.
alter task email_me_frequently resume;
select *
from table(information_schema.task_history(
scheduled_time_range_start=>dateadd('hour',-48,current_timestamp()),
result_limit => 100));
Images generated by AI
Не беспокойтесь слишком сильно о создании системы оповещений — Snowflake уже разрабатывает ее ( в настоящее время в закрытом предварительном просмотре ):
CREATE ALERT Warehouse_Credit_Usage_Alert
WAREHOUSE = my_warehouse
SCHEDULE = ‘USING CRON 0 7 * * *UTC’ // everyday at 7 am
IF (EXISTS (SELECT
Warehouse_name,
SUM(CREDITS_USED) AS credits
FROM snowflake.account_usage.warehouse_metering_history
// aggregate warehouse Credit_used for the past 24 hours
WHERE datediff(hour, start_time, CURRENT_TIMESTAMP ())<=24
GROUP BY 1
HAVING credits > 10
ORDER BY 2 DESC))
THEN system$send_email (
‘My_email_notification_integration’,
‘admin1@company,com, [email protected]’,
‘Email Alert: Excessive warehouse usage!’,
‘Warehouse usage exceeds 10 credits in the past 24 hours’
)
- Попробуйте это с бесплатной пробной учетной записью Snowflake — для начала вам нужен только адрес электронной почты.
- Поиграйте с вариантами форматирования, которые
tabulate
предлагает.