Hola copo de nieve, envíame un correo electrónico
Snowflake puede enviar correos electrónicos ahora, así que construyamos un procedimiento almacenado que le envíe los resultados de una consulta todas las mañanas, incluso si hay un día festivo en algún lugar del mundo que esté afectando sus números.
Snowflake puede enviar correos electrónicos ahora
Para enviar una notificación por correo electrónico con Snowflake, ahora solo necesita una llamada SQL:
call system$send_email(
'my_email_int',
'[email protected], [email protected], [email protected]',
'This is the subject',
'This is the body'
);
¿Cómo notificar a los usuarios en Snowflake? ❄️ Copo de nieve en pocas palabras: notificaciones por correo electrónico
El objetivo de esta publicación es vincular esta funcionalidad con un programador y algún código de formato, de modo que todas las mañanas, de lunes a viernes, podamos obtener los resultados de una consulta SQL, incluidos los próximos días festivos. Como éste:

Paso 1: Desarrolle un procedimiento almacenado que formatee y envíe por correo electrónico el resultado de cualquier consulta
Este es el código que escribí, mientras depuraba mis habilidades para enviar por correo electrónico un resultado de consulta bien formateado:
with pretty_email_results as procedure()
returns string
language python
packages = ('snowflake-snowpark-python', 'tabulate')
handler = 'x'
as
$$
def x(session):
printed = session.sql(
"select * from table(result_scan(last_query_id(-1)))"
).to_pandas().to_markdown()
session.call('system$send_email',
'my_email_int',
'[email protected]',
'Email Alert: Task A has finished.',
printed)
$$
call pretty_email_results();
- Snowflake ahora tiene " Procedimientos anónimos " que le permiten definirlos y llamarlos en un solo paso. No necesita
create or replace
primero ycall
después, lo cual es especialmente útil durante la depuración. - Elegí escribir este procedimiento en Python, para usar la biblioteca
tabulate
. Esta biblioteca formateará los resultados de una consulta, para que se vean bien en nuestro correo electrónico saliente. - Anaconda ya proporciona la
tabulate
biblioteca en Snowflake, por lo que podemos usarla con la líneapackages = (‘tabulate’)
. - Con
result_scan(last_query_id(-1)
Snowflake puede acceder a los resultados de la última consulta ejecutada, lo que permitepretty_email_results()
enviar por correo electrónico los resultados que obtuvimos en el paso anterior. - Snowpark nos da el
session
objeto en nuestro procedimiento almacenado, que podemos usar para ejecutar el escaneo de resultados consession.sql()
. session.sql()
nos da un Snowpark DataFrame, y llamarlo.to_pandas()
nos da Pandas DataFrame. Luego , Pandas.to_markdown()
llamatabulate
, y se quejará a menos que no solicite explícitamente este paquete conpackages=(...)
.
Una vez que se realiza la depuración interactiva, puede mover el código a un procedimiento almacenado permanente:
create or replace procedure email_last_results(send_to string, subject string)
returns string
language python
runtime_version=3.8
packages = ('snowflake-snowpark-python', 'tabulate')
handler = 'x'
execute as caller
as
$$
import snowflake
def x(session, send_to, subject):
try:
body = session.sql(
"select * from table(result_scan(last_query_id(-1))) limit 100"
).to_pandas().to_markdown()
except snowflake.snowpark.exceptions.SnowparkSQLException as e:
body = '%s\n%s' % (type(e), e)
session.call('system$send_email',
'my_email_int',
send_to,
subject,
body)
return 'email sent:\n%s' % body
$$;
execute as caller
se necesita aquí para que el procedimiento almacenado pueda encontrar el historial de consultas del usuario que lo llama.runtime_version=3.8
debe ser explícito en un procedimiento almacenado permanente.- Agregué código para detectar excepciones y enviar un correo electrónico bonito de todos modos.
- Moví el sujeto y los destinatarios a los argumentos del procedimiento.
- Agregué un
limit 100
para evitar el envío de correos electrónicos del tamaño de TB.
call email_last_results('[email protected]', 'results from snowflake');
Ahora, un simple procedimiento almacenado de SQL puede ejecutar una consulta y pedirle a nuestro procedimiento de Python que envíe los resultados por correo electrónico:
create or replace procedure run_query_and_email_it()
returns string
execute as caller
as
$$
begin
-- any query you'd like to run
select 'now' as what, to_char(current_timestamp) as value
union all
select 'upcoming', day || ' ' ||holiday
from table(fh_db.public.holidays('US', [year(current_date), year(current_date)+1]))
where day between current_date() and current_date()+60
;
-- call the stored procedure that formats and emails the results
call email_last_results('[email protected]', 'upcoming holidays');
return 'done';
end
$$
;
- Para obtener una tabla con todos los días festivos de los próximos 60 días , llama a mi UDTF previamente definido .
holidays
Generación de todos los días festivos en SQL, con Python UDTF
call run_query_and_email_it();
Snowflake tiene un programador de tareas listo para tus solicitudes:
create or replace task email_me_frequently
warehouse = 's'
schedule = 'using cron 0 8 * * 1-5 America/Los_Angeles'
as call run_query_and_email_it();
Algunas notas:
- Le pedí que usara mi almacén llamado "s". Snowflake también ofrece tareas sin servidor , pero esas no ejecutan procedimientos almacenados de Python.
- Tuve que modificar mi
holidays()
UDF moviendo las.zip
dependencias de Python de una etapa de usuario a una etapa con nombre . Cuando vivía dentro de mi etapa de usuario, la tarea arrojó el errorRemote file ‘holidays.zip’ was not found
. - Cambie la programación a cada
1 minute
si necesita depurar.
alter task email_me_frequently resume;
select *
from table(information_schema.task_history(
scheduled_time_range_start=>dateadd('hour',-48,current_timestamp()),
result_limit => 100));
Images generated by AI
No se preocupe demasiado por crear un sistema de alerta : Snowflake ya lo está desarrollando ( actualmente en versión preliminar privada ):
CREATE ALERT Warehouse_Credit_Usage_Alert
WAREHOUSE = my_warehouse
SCHEDULE = ‘USING CRON 0 7 * * *UTC’ // everyday at 7 am
IF (EXISTS (SELECT
Warehouse_name,
SUM(CREDITS_USED) AS credits
FROM snowflake.account_usage.warehouse_metering_history
// aggregate warehouse Credit_used for the past 24 hours
WHERE datediff(hour, start_time, CURRENT_TIMESTAMP ())<=24
GROUP BY 1
HAVING credits > 10
ORDER BY 2 DESC))
THEN system$send_email (
‘My_email_notification_integration’,
‘admin1@company,com, [email protected]’,
‘Email Alert: Excessive warehouse usage!’,
‘Warehouse usage exceeds 10 credits in the past 24 hours’
)
- Pruébelo con una cuenta de prueba gratuita de Snowflake : solo necesita una dirección de correo electrónico para comenzar.
- Juega con las opciones de formato que
tabulate
ofrece.