Onramp-Schulung: Data Engineering
Hallo alle miteinander! Da dies mein erster Artikel auf Medium ist, erlauben Sie mir, mich vorzustellen. Ich bin Zara Saldanha, geboren und aufgewachsen in Connecticut. Ich habe 2020 meinen Abschluss in Informatik und Ingenieurwesen an der Bucknell University in Pennsylvania gemacht. Ich habe etwas mehr als zwei Jahre als Data Scientist bei AT&T/DirecTV gearbeitet. Jetzt beginne ich eine Data Engineering-Ausbildung bei Vanguard über Onramp. Ich werde später einen weiteren Artikel über den Interviewprozess schreiben :).
Vor dem Sprung in die Ausbildung bei Vanguard gibt es eine vierwöchige Einarbeitungszeit bei Onramp. In diesem Artikel werde ich durchgehen, woran ich während des Trainings gearbeitet und gelernt habe.
Woche 1
Gleich am ersten Trainingstag haben wir uns darauf eingelassen. Zuerst ein Treffen mit Onramp und unseren Mentoren mit einem Überblick über das Programm. Dann ein Treffen mit Vanguard. Danach hatten wir unser erstes Sprint-Planungsmeeting und bekamen separate Stories zugeteilt, die sich alle am Ende des Sprints integrieren.
Jeder Geschichte waren Ressourcen wie Tutorials, YouTube-Videos und Dokumentationen beigefügt, um die Aufgabe zu erledigen. Ich hatte vor dieser Woche sehr wenig AWS-Erfahrung; abgesehen davon, dass ich in meiner vorherigen Position (S3, Redshift, Athena) die Plattform ein wenig erkundet habe. Die Ressourcen waren in diesem Fall äußerst hilfreich. Ich bin ein ausgesprochener „DIY“-Lerner, daher war das Format des Trainings perfekt für mich.
Ich richtete mein Bitbucket-Konto ein und stellte sicher, dass ich mich auf das OnRamp-Repository festlegen konnte, und begann dann mit meiner ersten Aufgabe!
Aufgabe
- Erstellen Sie eine CloudFormation-Vorlage zum Erstellen einer Lambda-Funktion, die in einen S3-Bucket schreibt.
- Stellen Sie die Python-Lambda-Funktion mithilfe einer Bitbucket-Pipeline bereit.
Nachdem ich mich über AWS Lambda und CloudFormation informiert hatte, erstellte ich als Erstes einen S3-Bucket für die Ausgabe und dann die folgende Beispielfunktion in der AWS Lambda-Konsole. Der Code erstellt eine Textdatei mit der Aufschrift „Hello world!“. und speichert es in dem gerade erstellten s3-Bucket. Nachdem ich es getestet hatte, lud ich die Python-Datei herunter und legte sie in meinem s3-Bucket ab, um sie in der CloudFormation-Vorlage zu verwenden.
import os
import boto3
import io
def lambda_handler(event, context):
bucket_name = event['bucket_name']
file_name = event['file_name']
file = io.BytesIO(bytes('Hello world!', encoding='utf-8'))
s3 = boto3.resource('s3')
bucket = s3.Bucket(bucket_name)
bucket_object = bucket.Object(file_name)
bucket_object.upload_fileobj(file)
Nun zum wichtigeren Teil der Aufgabe: Erstellen einer CloudFormation-Vorlage zum Bereitstellen der Lambda-Funktion. Ich brauchte eine Weile, um das Konzept der Vorlage und des Formats zu verstehen, aber am Ende ergab zum Glück alles einen Sinn.
Die Vorlage besteht aus den folgenden Ressourcen:
- Lambda-Funktion: HelloWorld
- S3-Bucket: LambdaZipsBucket
- IAM-Rolle: HelloWorldRole
AWSTemplateFormatVersion: 2010-09-09
Resources:
HelloWorld:
Type: 'AWS::Lambda::Function'
Properties:
FunctionName: helloWorld
Handler: lambda_function.lambda_handler
Role: !GetAtt HelloWorldRole.Arn
Runtime: python3.9
Code:
S3Bucket: 'myonrampbucket'
S3Key: 'lambda_function.py.zip'
LambdaZipsBucket:
Type: 'AWS::S3::Bucket'
Properties:
BucketName: 'lambdazips2'
HelloWorldRole:
Type: 'AWS::IAM::Role'
Properties:
RoleName: HelloWorldRole
ManagedPolicyArns:
- "arn:aws:iam::aws:policy/CloudWatchFullAccess"
- "arn:aws:iam::aws:policy/AmazonS3FullAccess"
AssumeRolePolicyDocument:
Version: 2012-10-17
Statement:
- Effect: Allow
Principal:
Service:
- lambda.amazonaws.com
Action:
- 'sts:AssumeRole'
Path: /
Policies:
- PolicyName: AWSLambdaBasicExecutionRole
PolicyDocument:
Version: 2012-10-17
Statement:
- Effect: Allow
Action:
- 'logs:CreateLogGroup'
- 'logs:CreateLogStream'
- 'logs:PutLogEvents'
Resource: '*'
pipelines:
default:
- step:
name: Build and package
script:
- apt-get update && apt-get install -y zip
- zip code.zip lambda_function.py
artifacts:
- code.zip
- step:
name: Update Lambda code
script:
- pipe: atlassian/aws-lambda-deploy:0.2.1
variables:
AWS_ACCESS_KEY_ID: ${AWS_ACCESS_KEY_ID}
AWS_SECRET_ACCESS_KEY: ${AWS_SECRET_ACCESS_KEY}
AWS_DEFAULT_REGION: 'us-west-2'
FUNCTION_NAME: 'helloWorld'
COMMAND: 'update'
ZIP_FILE: 'code.zip'
Woche 2
Diese Woche war kurz, da wir an Thanksgiving zwei Tage frei hatten. Ich war super aufgeregt, in den nächsten Sprint zu springen. AWS und Data Engineering sind für mich eine aufregende neue Welt. Ich wurde für diese Geschichte mit Bethel und Liying zusammengebracht.
Aufgabe
Verarbeiten Sie Rohdaten (CSV oder JSON) aus dem Landing S3-Bucket und speichern Sie sie in einem anderen S3-Bucket in einem Spaltenformat mit Partitionierung.
- Erstellen Sie einen Glue-Job mit PySpark und aktivieren Sie Lesezeichen.
- Lassen Sie den Glue-Job Rohdaten (CSV oder JSON) aus dem Landing-S3-Bucket verarbeiten und speichern Sie sie in einem anderen S3-Bucket in einem Spaltenformat mit Partitionierung: Partitionierung nach Bestandsname und Bucket nach Jahr.
- Glue-Trigger erstellen, um den PySpark-Job automatisch zu starten, wenn neue Dateien im S3-Bucket erstellt werden.
- Stellen Sie sicher, dass die Komponenten mithilfe von CloudFormation bereitgestellt werden können.
Die Daten aus dem Landing-S3-Bucket sehen folgendermaßen aus:
Date,Open,High,Low,Close,Volume,ticker_id,ticker_name
2017-11-22 00:00:00-05:00,28.5271320343017,28.5271320343017,28.5271320343017,28.5271320343017,0,0,VASGX
2017-11-24 00:00:00-05:00,28.5954589843749,28.5954589843749,28.5954589843749,28.595458984375,0,0,VASGX
2017-11-27 00:00:00-05:00,28.5356674194335,28.5356674194335,28.5356674194335,28.5356674194335,0,0,VASGX
2017-11-28 00:00:00-05:00,28.7150325775146,28.7150325775146,28.7150325775146,28.7150325775146,0,0,VASGX
2017-11-29 00:00:00-05:00,28.6637859344482,28.6637859344482,28.6637859344482,28.6637859344482,0,0,VASGX
2017-11-30 00:00:00-05:00,28.757734298706,28.757734298706,28.757734298706,28.757734298706,0,0,VASGX
2017-12-01 00:00:00-05:00,28.7150325775146,28.7150325775146,28.7150325775146,28.7150325775146,0,0,VASGX
2017-12-04 00:00:00-05:00,28.6637859344482,28.6637859344482,28.6637859344482,28.6637859344482,0,0,VASGX
2017-12-05 00:00:00-05:00,28.60400390625,28.60400390625,28.60400390625,28.60400390625,0,0,VASGX
import sys
import boto3
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
from awsglue.dynamicframe import DynamicFrame
from pyspark.sql.functions import col
from pyspark.sql.functions import to_date, split
## @params: [JOB_NAME]
args = getResolvedOptions(sys.argv, ['JOB_NAME'])
# Create spark cluster
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)
# Create dynamic frame of CSV from S3 bucket
dynamicFrame = glueContext.create_dynamic_frame.from_options(
connection_type="s3",
connection_options={"paths": ["s3://myonrampbucket/tickers_fct_name.csv"]},
format="csv",
format_options={
"withHeader": True,
# "optimizePerformance": True,
},
)
# Convert dynamic frame to a DataFrame
df = dynamicFrame.toDF()
df.show()
print("Dataframe converted")
# Create 'Year' column from 'date' column
df = df.withColumn("dateAdded", to_date(split(df["Date"], " ").getItem(0)\
.cast("string"), 'yyyy-MM-dd')) \
.withColumn("year", split(col("dateAdded"), "-").getItem(0)) \
.withColumn("month", split(col("dateAdded"), "-").getItem(1)) \
.withColumn("day", split(col("dateAdded"), "-").getItem(2)) \
.withColumn("tickerName",col("ticker_name").cast("string")) \
.orderBy('year')
print("Dataframe columns added and sorted by year.")
print(df.show())
# Partition dataframe by year and ticker name
partitioned_dataframe = df.repartition(col("year"), col("tickerName"))
print("Dataframe repartitioned")
# Convert back to dynamic frame
dynamic_frame_write = DynamicFrame.fromDF(partitioned_dataframe, glueContext, "dynamic_frame_write")
print("Dataframe converted to dynamic frame")
# Save dynamic frame into S3 bucket
glueContext.write_dynamic_frame.from_options(frame=dynamic_frame_write,
connection_type="s3",
connection_options=dict(path="s3://stocks-partitioned/",
partitionKeys=["year", "ticker_name"]),
format="parquet",
transformation_ctx="datasink2")
print("Dynamic frame saved in s3")
# Commit file read to Job Bookmark
job.commit()
print("Job completed!!!")
job.commit()
Auslöser der Lambda-Funktion
Schließlich habe ich eine Lambda-Funktion erstellt, die den Glue-Job mit boto3 jedes Mal aufruft, wenn ein neues Element im Ziel-S3-Bucket erstellt wird. Alle Komponenten in dieser Geschichte können mit CloudFormation bereitgestellt werden.
import json
import urllib.parse
import boto3
from botocore.exceptions import ClientError
print('Loading function')
s3 = boto3.client('s3')
def lambda_handler(event, context):
print("Received event: " + json.dumps(event, indent=2))
# Get the object from the event and show its content type
bucket = event['Records'][0]['s3']['bucket']['name']
key = urllib.parse.unquote_plus(event['Records'][0]['s3']['object']['key'], encoding='utf-8')
try:
response = s3.get_object(Bucket=bucket, Key=key)
print("CONTENT TYPE: " + response['ContentType'])
#return response['ContentType']
except Exception as e:
print(e)
print('Error getting object {} from bucket {}. Make sure they exist and your bucket is in the same region as this function.'.format(key, bucket))
raise e
def run_glue_job(job_name, arguments = {}):
session = boto3.session.Session()
glue_client = session.client('glue')
try:
job_run_id = glue_client.start_job_run(JobName=job_name, Arguments=arguments)
return job_run_id
except ClientError as e:
raise Exception( "boto3 client error in run_glue_job: " + e.__str__())
except Exception as e:
raise Exception( "Unexpected error in run_glue_job: " + e.__str__())
print(run_glue_job("Stocks"))
Mehr neue AWS-Technologien! Diese Woche war ich mit Ashley zusammen.
Aufgabe
- Erstellen Sie einen kleinen EMR-Cluster mit Apache Spark und allen anderen erforderlichen Ökosystem-Tools.
- Der EMR sollte in der Lage sein, PySpark-Anwendungen auszuführen, die S3 lesen und schreiben können. Stellen Sie sicher, dass die IAM-Berechtigungen die geringsten Rechte haben.
- Aktualisieren Sie das PySpark-Skript, um Argumente wie Eingabedatenrahmen und einen Ausgabe-URI zu akzeptieren.
Zuerst habe ich die CSV-Dateien heruntergeladen, die vom PySpark-Skript verarbeitet werden, und sie zum Testen in einen s3-Bucket hochgeladen.
Dann haben wir ein paar Änderungen am PySpark-Skript vorgenommen, das von Liying und Tenzin geschrieben wurde. Wir haben die Hauptfunktion hinzugefügt und einige Änderungen an den Funktionen drop_columns, get_files und process_datasets vorgenommen.
import sys
import argparse
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, isnan, when, count
from pprint import pprint
from pyspark.sql.functions import split
spark = SparkSession.builder.appName('Merge').getOrCreate()
state_names = ["Alaska", "Alabama", "Arkansas", "American Samoa", "Arizona", "California", "Colorado", "Connecticut", "District of Columbia", "Delaware", "Florida", "Georgia", "Guam", "Hawaii", "Iowa", "Idaho", "Illinois", "Indiana", "Kansas", "Kentucky", "Louisiana", "Massachusetts", "Maryland", "Maine", "Michigan", "Minnesota", "Missouri", "Mississippi", "Montana", "North Carolina", "North Dakota", "Nebraska", "New Hampshire", "New Jersey", "New Mexico", "Nevada", "New York", "Ohio", "Oklahoma", "Oregon", "Pennsylvania", "Puerto Rico", "Rhode Island", "South Carolina", "South Dakota", "Tennessee", "Texas", "Utah", "Virginia", "Virgin Islands", "Vermont", "Washington", "Wisconsin", "West Virginia", "Wyoming", "United States"]
unwanted_population_columns = (
'Rural-Urban Continuum Code 2013',
'Population 1990',
...
)
unwanted_education_columns = ()
unwanted_unemployment_columns = (
'Rural_urban_continuum_code_2013',
'Urban_influence_code_2013',
...
)
unwanted_poverty_columns = (
'Rural-urban_Continuum_Code_2003',
'Urban_Influence_Code_2003',
...
)
def get_files(pop_df,edu_df,unemp_df,pov_df):
print("getting files................................")
population_df = spark.read.csv(pop_df, header=True, inferSchema=True)
education_df = spark.read.csv(edu_df, header=True, inferSchema=True)
unemployment_df = spark.read.csv(unemp_df, header=True, inferSchema=True)
poverty_df = spark.read.csv(pov_df, header=True, inferSchema=True)
return [population_df, education_df, unemployment_df, poverty_df]
def drop_columns(dfs):
print("dropping columns.......................................")
population, education, unemployment, poverty = dfs
population = population.drop(*unwanted_population_columns)
education = education.drop(*unwanted_education_columns)
unemployment = unemployment.drop(*unwanted_unemployment_columns)
poverty = poverty.drop(*unwanted_poverty_columns)
return [population, education, unemployment, poverty]
def rename_columns(dfs):
print("renaming columns........................................")
final_dfs = []
for df in dfs:
if 'Area_name' in df.columns:
df = df.withColumnRenamed('Area_name', 'County')
if 'Area name' in df.columns:
df = df.withColumnRenamed('Area name', 'County')
if 'Stabr' in df.columns:
df = df.withColumnRenamed('Stabr', 'State')
if 'FIPS_code' in df.columns:
df = df.withColumnRenamed('FIPS_code', 'FIPS_Code')
# df = check_for_duplicates(df)
df = df.withColumn('County', split(df["County"], ",").getItem(0))
final_dfs.append(df)
return final_dfs
def check_for_null(dfs):
print('checking for null values.....................................')
# population, education, unemployment, poverty = dfs
final_dfs = []
# Find Count of Null, None, NaN and empty values of All DataFrame Columns
for df in dfs:
df.select([count(when(col(c).contains('None'), c)).alias(c) for c in df.columns]).show()
df.select([count(when(col(c).contains('NULL'), c)).alias(c) for c in df.columns]).show()
df.select([count(when((col(c) == '' ), c)).alias(c) for c in df.columns]).show()
df.select([count(when(isnan(c), c)).alias(c) for c in df.columns]).show()
df.select([count(when(col(c).isNull(), c)).alias(c) for c in df.columns]).show()
df = df.na.fill('No Record')
final_dfs.append(df)
df.printSchema()
return final_dfs
def check_for_duplicates(dfs):
print('checking for duplicate values.................................')
# create a loop and perform the following for each data set
final_dfs = []
for df in dfs:
count = df.count() - df.dropDuplicates(['County']).count()
print(count)
if count > 0:
df = df.dropDuplicates(['County'])
final_dfs.append(df)
return dfs
def combine_frames(dfs):
print('finally merging the dataframes............................')
population, education, unemployment, poverty = dfs
df1 = population
df1 = df1.join(poverty, ['FIPS_Code', 'State','County'], "outer")
df1 = df1.join(unemployment, ['FIPS_Code', 'State','County'], "outer")
df1 = df1.join(education, ['FIPS_Code','County'], "outer")
return df1
def process_datasets(pop_df,edu_df,unemp_df,pov_df,output_uri):
sys.stdout.write("processing...........................")
df_list = get_files(pop_df,edu_df,unemp_df,pov_df)
sys.stdout.write("files retrieved...........................")
df_post_columns = drop_columns(df_list)
sys.stdout.write("columns dropped...........................")
df_post_renaming = rename_columns(df_post_columns)
sys.stdout.write("columns renamed...........................")
df_post_filter = filter_rows(df_post_renaming)
sys.stdout.write("rows filtered...........................")
df_post_null = check_for_null(df_post_filter)
sys.stdout.write("checking for nulls...........................")
df_post_duplicates = check_for_duplicates(df_post_null)
sys.stdout.write("checking for duplicates...........................")
final_df = combine_frames(df_post_duplicates)
sys.stdout.write("combine frames...........................")
final_df.write.option("header", "true").mode("overwrite").csv(output_uri)
def filter_rows(dfs):
print("filtering data.......................................")
final_dfs = []
for df in dfs:
final_dfs.append(df.filter(~col("County").isin(state_names)))
return final_dfs
if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument(
'--pop_df', help="The URI for population data, like an S3 bucket location.")
parser.add_argument(
'--edu_df', help="The URI for education data, like an S3 bucket location.")
parser.add_argument(
'--unemp_df', help="The URI for unemployment data, like an S3 bucket location.")
parser.add_argument(
'--pov_df', help="The URI for poverty data, like an S3 bucket location.")
parser.add_argument(
'--output_uri', help="The URI where output is saved, like an S3 bucket location.")
args = parser.parse_args()
process_datasets(args.pop_df, args.edu_df, args.unemp_df, args.pov_df, args.output_uri)
Danach haben wir unser EC2-Schlüsselpaar und dann den EMR-Cluster mit Spark-Fähigkeit erstellt. Wir haben einen Schritt zum Ausführen des PySpark-Skripts hinzugefügt und die Argumente bereitgestellt (vier CSV-Dateien und den Ausgabe-URI). Es dauerte etwa eine Minute, um den Schritt abzuschließen, der den Spark-Job auf jeder Datenpartition parallel ausführt, 26 davon, und dann jede Datei in den Ausgabeordner hochlädt.
Woche 4
Die letzte Woche, bevor wir in die Datenwildnis von Vanguard aufbrechen!
In dieser Woche ging es mehr darum, unsere Präsentation des Demografie-Projekts und des Kundenerfahrungsprojekts nächste Woche bei Vanguard vorzubereiten und alle losen Enden zu verknüpfen. Ich wurde beauftragt, alle bisher für das Demografie-Projekt entwickelten Komponenten zu integrieren, um das Projekt automatisch in AWS bereitzustellen und einige End-to-End-Trockenläufe durchzuführen. Ich traf mich mit jeder Gruppe, um ihren Abschnitt des Projekts durchzugehen und dann das Format für die Präsentation zusammenzustellen.
Insgesamt habe ich das Gefühl, dass ich in nur vier Wochen so viel gelernt habe, und ich freue mich sehr darauf, die nächsten 7 Monate damit zu verbringen, noch mehr mit Vanguard zu lernen :).

![Was ist überhaupt eine verknüpfte Liste? [Teil 1]](https://post.nghiatu.com/assets/images/m/max/724/1*Xokk6XOjWyIGCBujkJsCzQ.jpeg)



































