Badanie artefaktów opartych na dziennikach
Do tej pory widzieliśmy, jak uzyskać artefakty w systemie Windows za pomocą Pythona. W tym rozdziale dowiemy się o badaniu artefaktów opartych na dziennikach za pomocą języka Python.
Wprowadzenie
Artefakty oparte na dziennikach to skarbnica informacji, która może być bardzo przydatna dla eksperta kryminalistyki cyfrowej. Chociaż mamy różne oprogramowanie monitorujące do zbierania informacji, głównym problemem związanym z analizowaniem przydatnych informacji z nich jest to, że potrzebujemy dużej ilości danych.
Różne artefakty oparte na dziennikach i badanie w języku Python
W tej sekcji omówimy różne artefakty oparte na dziennikach i ich badanie w Pythonie -
Znaczniki czasu
Znacznik czasu zawiera dane i czas aktywności w dzienniku. Jest to jeden z ważnych elementów każdego pliku dziennika. Należy pamiętać, że te wartości danych i czasu mogą mieć różne formaty.
Pokazany poniżej skrypt Pythona przyjmie nieprzetworzoną datę i godzinę jako dane wejściowe i zapewni sformatowany znacznik czasu jako dane wyjściowe.
W przypadku tego skryptu musimy wykonać następujące kroki -
Najpierw ustaw argumenty, które przyjmą surową wartość danych wraz ze źródłem danych i typem danych.
Teraz zapewnij klasę zapewniającą wspólny interfejs dla danych w różnych formatach dat.
Kod w Pythonie
Zobaczmy, jak w tym celu wykorzystać kod Pythona -
Najpierw zaimportuj następujące moduły Pythona -
from __future__ import print_function
from argparse import ArgumentParser, ArgumentDefaultsHelpFormatter
from datetime import datetime as dt
from datetime import timedelta
Teraz jak zwykle musimy podać argument dla obsługi wiersza poleceń. Tutaj przyjmie trzy argumenty, pierwszy będzie wartością daty do przetworzenia, drugi będzie źródłem tej wartości daty, a trzeci będzie jego typem -
if __name__ == '__main__':
parser = ArgumentParser('Timestamp Log-based artifact')
parser.add_argument("date_value", help="Raw date value to parse")
parser.add_argument(
"source", help = "Source format of date",choices = ParseDate.get_supported_formats())
parser.add_argument(
"type", help = "Data type of input value",choices = ('number', 'hex'), default = 'int')
args = parser.parse_args()
date_parser = ParseDate(args.date_value, args.source, args.type)
date_parser.run()
print(date_parser.timestamp)
Teraz musimy zdefiniować klasę, która będzie akceptować argumenty wartości daty, źródła daty i typu wartości -
class ParseDate(object):
def __init__(self, date_value, source, data_type):
self.date_value = date_value
self.source = source
self.data_type = data_type
self.timestamp = None
Teraz zdefiniujemy metodę, która będzie działać jak kontroler, tak jak metoda main () -
def run(self):
if self.source == 'unix-epoch':
self.parse_unix_epoch()
elif self.source == 'unix-epoch-ms':
self.parse_unix_epoch(True)
elif self.source == 'windows-filetime':
self.parse_windows_filetime()
@classmethod
def get_supported_formats(cls):
return ['unix-epoch', 'unix-epoch-ms', 'windows-filetime']
Teraz musimy zdefiniować dwie metody, które będą przetwarzać odpowiednio czas epoki Unix i czas FILETIME -
def parse_unix_epoch(self, milliseconds=False):
if self.data_type == 'hex':
conv_value = int(self.date_value)
if milliseconds:
conv_value = conv_value / 1000.0
elif self.data_type == 'number':
conv_value = float(self.date_value)
if milliseconds:
conv_value = conv_value / 1000.0
else:
print("Unsupported data type '{}' provided".format(self.data_type))
sys.exit('1')
ts = dt.fromtimestamp(conv_value)
self.timestamp = ts.strftime('%Y-%m-%d %H:%M:%S.%f')
def parse_windows_filetime(self):
if self.data_type == 'hex':
microseconds = int(self.date_value, 16) / 10.0
elif self.data_type == 'number':
microseconds = float(self.date_value) / 10
else:
print("Unsupported data type '{}' provided".format(self.data_type))
sys.exit('1')
ts = dt(1601, 1, 1) + timedelta(microseconds=microseconds)
self.timestamp = ts.strftime('%Y-%m-%d %H:%M:%S.%f')
Po uruchomieniu powyższego skryptu, podając znacznik czasu możemy uzyskać przekonwertowaną wartość w łatwym do odczytania formacie.
Dzienniki serwera sieci Web
Z punktu widzenia eksperta kryminalistyki cyfrowej, dzienniki serwera WWW są kolejnym ważnym artefaktem, ponieważ mogą uzyskać przydatne statystyki użytkownika wraz z informacjami o użytkowniku i lokalizacji geograficznej. Poniżej znajduje się skrypt w języku Python, który po przetworzeniu dzienników serwera WWW utworzy arkusz kalkulacyjny w celu łatwej analizy informacji.
Przede wszystkim musimy zaimportować następujące moduły Pythona -
from __future__ import print_function
from argparse import ArgumentParser, FileType
import re
import shlex
import logging
import sys
import csv
logger = logging.getLogger(__file__)
Teraz musimy zdefiniować wzorce, które będą analizowane z dzienników -
iis_log_format = [
("date", re.compile(r"\d{4}-\d{2}-\d{2}")),
("time", re.compile(r"\d\d:\d\d:\d\d")),
("s-ip", re.compile(
r"((25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)(\.|$)){4}")),
("cs-method", re.compile(
r"(GET)|(POST)|(PUT)|(DELETE)|(OPTIONS)|(HEAD)|(CONNECT)")),
("cs-uri-stem", re.compile(r"([A-Za-z0-1/\.-]*)")),
("cs-uri-query", re.compile(r"([A-Za-z0-1/\.-]*)")),
("s-port", re.compile(r"\d*")),
("cs-username", re.compile(r"([A-Za-z0-1/\.-]*)")),
("c-ip", re.compile(
r"((25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)(\.|$)){4}")),
("cs(User-Agent)", re.compile(r".*")),
("sc-status", re.compile(r"\d*")),
("sc-substatus", re.compile(r"\d*")),
("sc-win32-status", re.compile(r"\d*")),
("time-taken", re.compile(r"\d*"))]
Teraz podaj argument do obsługi wiersza poleceń. Tutaj przyjmie dwa argumenty, pierwszy będzie logiem IIS do przetworzenia, a drugi będzie wymaganą ścieżką pliku CSV.
if __name__ == '__main__':
parser = ArgumentParser('Parsing Server Based Logs')
parser.add_argument('iis_log', help = "Path to IIS Log",type = FileType('r'))
parser.add_argument('csv_report', help = "Path to CSV report")
parser.add_argument('-l', help = "Path to processing log",default=__name__ + '.log')
args = parser.parse_args()
logger.setLevel(logging.DEBUG)
msg_fmt = logging.Formatter(
"%(asctime)-15s %(funcName)-10s ""%(levelname)-8s %(message)s")
strhndl = logging.StreamHandler(sys.stdout)
strhndl.setFormatter(fmt = msg_fmt)
fhndl = logging.FileHandler(args.log, mode = 'a')
fhndl.setFormatter(fmt = msg_fmt)
logger.addHandler(strhndl)
logger.addHandler(fhndl)
logger.info("Starting IIS Parsing ")
logger.debug("Supplied arguments: {}".format(", ".join(sys.argv[1:])))
logger.debug("System " + sys.platform)
logger.debug("Version " + sys.version)
main(args.iis_log, args.csv_report, logger)
iologger.info("IIS Parsing Complete")
Teraz musimy zdefiniować metodę main (), która będzie obsługiwać skrypt dla zbiorczych informacji o logach -
def main(iis_log, report_file, logger):
parsed_logs = []
for raw_line in iis_log:
line = raw_line.strip()
log_entry = {}
if line.startswith("#") or len(line) == 0:
continue
if '\"' in line:
line_iter = shlex.shlex(line_iter)
else:
line_iter = line.split(" ")
for count, split_entry in enumerate(line_iter):
col_name, col_pattern = iis_log_format[count]
if col_pattern.match(split_entry):
log_entry[col_name] = split_entry
else:
logger.error("Unknown column pattern discovered. "
"Line preserved in full below")
logger.error("Unparsed Line: {}".format(line))
parsed_logs.append(log_entry)
logger.info("Parsed {} lines".format(len(parsed_logs)))
cols = [x[0] for x in iis_log_format]
logger.info("Creating report file: {}".format(report_file))
write_csv(report_file, cols, parsed_logs)
logger.info("Report created")
Na koniec musimy zdefiniować metodę, która zapisze dane wyjściowe do arkusza kalkulacyjnego -
def write_csv(outfile, fieldnames, data):
with open(outfile, 'w', newline="") as open_outfile:
csvfile = csv.DictWriter(open_outfile, fieldnames)
csvfile.writeheader()
csvfile.writerows(data)
Po uruchomieniu powyższego skryptu otrzymamy dzienniki oparte na serwerze WWW w arkuszu kalkulacyjnym.
Skanowanie ważnych plików za pomocą YARA
YARA (Yet Another Recursive Algorithm) to narzędzie do dopasowywania wzorców przeznaczone do identyfikacji złośliwego oprogramowania i reagowania na incydenty. Do skanowania plików użyjemy YARA. W poniższym skrypcie Pythona użyjemy YARA.
YARA możemy zainstalować za pomocą następującego polecenia -
pip install YARA
Możemy postępować zgodnie z instrukcjami podanymi poniżej, aby używać reguł YARA do skanowania plików -
Najpierw skonfiguruj i skompiluj reguły YARA
Następnie przeskanuj pojedynczy plik, a następnie iteruj po katalogach, aby przetworzyć poszczególne pliki.
Na koniec wyeksportujemy wynik do CSV.
Kod w Pythonie
Zobaczmy, jak w tym celu wykorzystać kod Pythona -
Najpierw musimy zaimportować następujące moduły Pythona -
from __future__ import print_function
from argparse import ArgumentParser, ArgumentDefaultsHelpFormatter
import os
import csv
import yara
Następnie podaj argument dla obsługi wiersza poleceń. Zauważ, że w tym przypadku przyjmie dwa argumenty - pierwszy to ścieżka do reguł YARA, a drugi to plik do przeskanowania.
if __name__ == '__main__':
parser = ArgumentParser('Scanning files by YARA')
parser.add_argument(
'yara_rules',help = "Path to Yara rule to scan with. May be file or folder path.")
parser.add_argument('path_to_scan',help = "Path to file or folder to scan")
parser.add_argument('--output',help = "Path to output a CSV report of scan results")
args = parser.parse_args()
main(args.yara_rules, args.path_to_scan, args.output)
Teraz zdefiniujemy funkcję main (), która zaakceptuje ścieżkę do reguł yara i pliku do przeskanowania -
def main(yara_rules, path_to_scan, output):
if os.path.isdir(yara_rules):
yrules = yara.compile(yara_rules)
else:
yrules = yara.compile(filepath=yara_rules)
if os.path.isdir(path_to_scan):
match_info = process_directory(yrules, path_to_scan)
else:
match_info = process_file(yrules, path_to_scan)
columns = ['rule_name', 'hit_value', 'hit_offset', 'file_name',
'rule_string', 'rule_tag']
if output is None:
write_stdout(columns, match_info)
else:
write_csv(output, columns, match_info)
Teraz zdefiniuj metodę, która będzie iterować po katalogu i przekaże wynik do innej metody w celu dalszego przetwarzania -
def process_directory(yrules, folder_path):
match_info = []
for root, _, files in os.walk(folder_path):
for entry in files:
file_entry = os.path.join(root, entry)
match_info += process_file(yrules, file_entry)
return match_info
Następnie zdefiniuj dwie funkcje. Zauważ, że najpierw użyjemymatch() metoda do yrulesobiekt, a inny zgłosi pasujące informacje do konsoli, jeśli użytkownik nie określi żadnego pliku wyjściowego. Przestrzegaj kodu pokazanego poniżej -
def process_file(yrules, file_path):
match = yrules.match(file_path)
match_info = []
for rule_set in match:
for hit in rule_set.strings:
match_info.append({
'file_name': file_path,
'rule_name': rule_set.rule,
'rule_tag': ",".join(rule_set.tags),
'hit_offset': hit[0],
'rule_string': hit[1],
'hit_value': hit[2]
})
return match_info
def write_stdout(columns, match_info):
for entry in match_info:
for col in columns:
print("{}: {}".format(col, entry[col]))
print("=" * 30)
Na koniec zdefiniujemy metodę, która zapisze dane wyjściowe do pliku CSV, jak pokazano poniżej -
def write_csv(outfile, fieldnames, data):
with open(outfile, 'w', newline="") as open_outfile:
csvfile = csv.DictWriter(open_outfile, fieldnames)
csvfile.writeheader()
csvfile.writerows(data)
Po pomyślnym uruchomieniu powyższego skryptu możemy podać odpowiednie argumenty w wierszu poleceń i wygenerować raport w formacie CSV.