Hej Snowflake, wyślij mi e-mail

Nov 30 2022
Płatek śniegu może teraz wysyłać wiadomości e-mail — utwórzmy więc procedurę składowaną, która każdego ranka wysyła wyniki zapytania — także wtedy, gdy gdzieś na świecie jest święto, które ma wpływ na Twoje liczby. Snowflake może teraz wysyłać e-maile Aby wysłać powiadomienie e-mail za pomocą Snowflake, potrzebujesz teraz tylko jednego wywołania SQL: Celem tego wpisu jest powiązanie tej funkcjonalności z harmonogramem i pewnym kodem formatującym — dzięki czemu każdego ranka od poniedziałku do piątku możemy uzyskać wyniki zapytania SQL, w tym nadchodzące święta.

Płatek śniegu może teraz wysyłać wiadomości e-mail — utwórzmy więc procedurę składowaną, która każdego ranka wysyła wyniki zapytania — także wtedy, gdy gdzieś na świecie jest święto, które ma wpływ na Twoje liczby.

Obraz wygenerowany przez AI

Płatek śniegu może teraz wysyłać e-maile

Aby wysłać powiadomienie e-mail za pomocą Snowflake, potrzebujesz teraz tylko jednego wywołania SQL:

call system$send_email(
    'my_email_int',
    '[email protected], [email protected], [email protected]',
    'This is the subject',
    'This is the body'
);

Jak powiadomić użytkowników w Snowflake? ❄️ Płatek śniegu w pigułce — powiadomienia e-mail

Celem tego wpisu jest powiązanie tej funkcjonalności z harmonogramem i pewnym kodem formatującym — tak więc każdego ranka od poniedziałku do piątku możemy uzyskać wyniki zapytania SQL, w tym nadchodzące święta. Jak ten:

E-mail wysłany przez Snowflake z nadchodzącymi świętami

Krok 1: Opracuj procedurę składowaną, która formatuje i wysyła e-mailem wynik dowolnej kwerendy

To jest kod, który napisałem podczas debugowania moich możliwości wysłania dobrze sformatowanego wyniku zapytania e-mailem:

with pretty_email_results as procedure()
returns string
language python
packages = ('snowflake-snowpark-python', 'tabulate')
handler = 'x'
as
$$
def x(session):
    printed = session.sql(
        "select * from table(result_scan(last_query_id(-1)))"
      ).to_pandas().to_markdown()
    session.call('system$send_email',
        'my_email_int',
        '[email protected]',
        'Email Alert: Task A has finished.',
        printed)
$$
call pretty_email_results();

  • Snowflake ma teraz „ procedury anonimowe ”, które pozwalają zdefiniować i wywołać je w jednym kroku. Nie potrzebujesz create or replacenajpierw, a callpóźniej — co jest szczególnie przydatne podczas debugowania.
  • Zdecydowałem się napisać tę procedurę w Pythonie, aby użyć biblioteki tabulate. Ta biblioteka sformatuje wyniki zapytania, aby wyglądały ładnie na naszej wychodzącej wiadomości e-mail.
  • Biblioteka tabulatejest już dostarczana przez Anacondę w Snowflake, więc możemy jej używać z linią packages = (‘tabulate’).
  • Za pomocą result_scan(last_query_id(-1)Snowflake można uzyskać dostęp do wyników ostatniego wykonanego zapytania — co pozwala pretty_email_results()na wysyłanie e-mailem wszelkich wyników otrzymanych w poprzednim kroku.
  • Snowpark przekazuje nam sessionobiekt w naszej procedurze składowanej, którego możemy użyć do wykonania skanowania wyników za pomocą session.sql().
  • session.sql()daje nam Snowpark DataFrame, a wywołanie .to_pandas()go daje nam Pandas DataFrame. Następnie Pandas .to_markdown()może zadzwonić tabulate— i będzie narzekać, chyba że wyraźnie poprosisz o ten pakiet za pomocą packages=(...).

Po zakończeniu interaktywnego debugowania możesz przenieść kod do stałej procedury składowanej:

create or replace procedure email_last_results(send_to string, subject string)
returns string
language python
runtime_version=3.8
packages = ('snowflake-snowpark-python', 'tabulate')
handler = 'x'
execute as caller
as
$$
import snowflake

def x(session, send_to, subject):
    try: 
        body = session.sql(
          "select * from table(result_scan(last_query_id(-1))) limit 100"
        ).to_pandas().to_markdown()
    except snowflake.snowpark.exceptions.SnowparkSQLException as e:
        body = '%s\n%s' % (type(e), e)
    session.call('system$send_email',
        'my_email_int',
        send_to,
        subject,
        body)
    return 'email sent:\n%s' % body
$$;

  • execute as callerjest tutaj potrzebny, aby procedura składowana mogła znaleźć historię zapytań użytkownika, który ją wywołuje.
  • runtime_version=3.8musi być jawna w stałej procedurze składowanej.
  • Dodałem kod, aby złapać wyjątki i mimo to wysłać ładny e-mail.
  • Temat i adresatów przeniosłem do argumentów proceduralnych.
  • Dodałem, limit 100aby zapobiec wysyłaniu wiadomości e-mail o rozmiarze TB.

call email_last_results('[email protected]', 'results from snowflake');

Teraz prosta procedura składowana SQL może uruchomić zapytanie i poprosić naszą procedurę Pythona o przesłanie wyników e-mailem:

create or replace procedure run_query_and_email_it()
returns string
execute as caller
as
$$
begin
    -- any query you'd like to run
    select 'now' as what, to_char(current_timestamp) as value
    union all
    select 'upcoming', day || ' ' ||holiday
    from table(fh_db.public.holidays('US', [year(current_date), year(current_date)+1]))
    where day between current_date() and current_date()+60
    ;
    -- call the stored procedure that formats and emails the results
    call email_last_results('[email protected]', 'upcoming holidays');
    return 'done';
end
$$
;

call run_query_and_email_it();

Snowflake ma harmonogram zadań gotowy do twoich żądań:

create or replace task email_me_frequently
warehouse = 's'
schedule = 'using cron 0 8 * * 1-5 America/Los_Angeles'
as call run_query_and_email_it();

Niektóre uwagi:

  • Poprosiłem go o skorzystanie z mojego magazynu o nazwie „s”. Snowflake oferuje również Serverless Tasks , ale te nie uruchamiają procedur przechowywanych w Pythonie.
  • Musiałem zmodyfikować mój holidays()UDF, przenosząc .zipzależności Pythona ze etapu użytkownika do nazwanego etapu . Kiedy żył w moim etapie użytkownika, zadanie zgłosiło błąd Remote file ‘holidays.zip’ was not found.
  • Zmień harmonogram na każdy 1 minute, jeśli chcesz debugować.

alter task email_me_frequently resume;

select *
  from table(information_schema.task_history(
    scheduled_time_range_start=>dateadd('hour',-48,current_timestamp()),
    result_limit => 100));

      
                
Images generated by AI

Nie martw się zbytnio budowaniem systemu alertów — Snowflake już to rozwija ( obecnie w prywatnej wersji zapoznawczej ):

CREATE ALERT Warehouse_Credit_Usage_Alert
 WAREHOUSE = my_warehouse
 SCHEDULE = ‘USING CRON 0 7 * * *UTC’ // everyday at 7 am
 IF (EXISTS (SELECT
  Warehouse_name,
  SUM(CREDITS_USED) AS credits
  FROM snowflake.account_usage.warehouse_metering_history
  // aggregate warehouse Credit_used for the past 24 hours
 WHERE datediff(hour, start_time, CURRENT_TIMESTAMP ())<=24
 GROUP BY 1
 HAVING credits > 10
 ORDER BY 2 DESC))
 THEN system$send_email (
  ‘My_email_notification_integration’, 
        ‘admin1@company,com, [email protected]’,
        ‘Email Alert: Excessive warehouse usage!’, 
        ‘Warehouse usage exceeds 10 credits in the past 24 hours’
)

  • Wypróbuj to z bezpłatnym kontem próbnym Snowflake — aby rozpocząć, potrzebujesz tylko adresu e-mail.
  • Graj z opcjami formatowania , które tabulateoferuje.
Obrazy generowane przez AI