Implementierung der Git-Zip / Tar-Datei vor dem Festschreiben und nach dem Auschecken

Dec 22 2020

Ich benutze regelmäßig ein Tool (Amesim), das seine Dateien in eine unkomprimierte TAR-Datei packt. Für die Versionierung habe ich Dateien normalerweise als file1_Rev01.ame bezeichnet und Änderungen vorgenommen. Dies funktioniert, wenn ich der einzige Benutzer bin, aber in letzter Zeit teile ich Dateien / Modelle regelmäßiger. Der Versuch, diese Modelle gemeinsam zu nutzen, ist schmerzhaft. Sie enthalten häufig Ergebnisse, die sehr umfangreich sind (gbs Daten), und verfolgen Änderungen zwischen den Versionen, wenn dies schwierig ist, es sei denn, Sie fügen bei jeder Änderung konsequent Text in das Modell ein. (Amesim ist ein Tool wie Simulink.)

Ich habe mich über Git-Hooks und Git-Filter informiert, bin mir aber nicht sicher, was ich tun soll, um die Versionierung eines Tarballs besser zu verwalten.

Angenommen, ich habe die Datei "my_file.tar" und sie besteht aus a.txt, b.model, c.data und d.results.

Von der Anwendungsseite aus würde ich "my_file.tar" bereitstellen und ein Commit "Updates to model" senden. Ohne Änderungen an git werden die Änderungen an einer Binärdatei verfolgt. Dies ist nicht lesbar und belegt viel Platz. Wenn Ergebnisse enthalten sind, ist die Datei ziemlich groß. Das Klonen des Repos ist eine Herausforderung, wenn die Ergebnisse kontinuierlich gespeichert werden.

Bei meinem ersten Versuch habe ich versucht, Pre-Commit- und Post-Checkout-Hooks zu verwenden.

Beim Festschreiben entpackt mein Pre-Commit-Hook "my_file.tar" in ein Verzeichnis "my_file_tar". Es entfernt die * .results-Datei, die beim Ausführen des Modells entsteht. Es ist nicht notwendig, dies zu verfolgen und spart viel Platz (gbs).

Wenn ich das Modell ziehe, sucht Post-Checkout nach Ordnern mit _tar und tariert sie und benennt sie in my_file.tar um.

Nun funktioniert das im Allgemeinen. Aber wie soll ich mit my_file.tar und unkomprimierten Ordnern umgehen? Wenn ich den unkomprimierten Ordner nach dem Auschecken automatisch lösche, gibt git an, dass ich wesentliche Änderungen zu verfolgen habe. Muss ich den Ordner jedes Mal zu .gitignore hinzufügen / entfernen? Außerdem zeigt die TAR-Datei niemals an, dass sie verfolgt wird, da ich sie im Pre-Commit-Code entfernt habe. Was kann ich tun, um diesen Prozess zu bereinigen? Wie soll ich anders damit umgehen?

Verweise:

  • Git Zip-Dateien
  • Verschmieren und reinigen
  • Git Office-Dokumente
  • Zippey
  • XLTrail

Für diesen Code ist .ame eine TAR-Datei.

Pre-Commit

#!/usr/bin/env python

import argparse
import os
import tarfile
import zipfile
import subprocess

def parse_args():
    pass

def log_file(log_item):
    cwd = os.getcwd()
    file = open("MyFile.txt", "a") # Open file in append mode
    file.write(log_item + '\n')
    return 1
    
def get_staged_ame_files():
    '''Request a list of staged files from git and return a list of *.ame files

    This function opens a subprocess with git, requests a list of names in the git staged list. It will return a list of strings.
    '''
    out = subprocess.Popen(['git', 'diff', '--staged', '--name-only'], stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
    stdout, stderr = out.communicate()
    # Separate output by newlines
    # staged_files = stdout.split(b'\n') # split as bytes
    
    # filter for files with .ame 
    staged_files = stdout.decode('utf-8').split('\n') # split as strings
    # Create list of *just* amesim files
    staged_ame_files = []
    for entry in staged_files:
        if entry.endswith(".ame"):
            staged_ame_files.append(entry)
    
    if not staged_ame_files:
        return None
    else:
        return staged_ame_files

def extract_ame_files(file_list):
    folder_list = []
    for list_item in file_list:
        # If file exists, extract it. Else continue.
        if os.path.isfile(list_item):
            tar = tarfile.open(list_item, "r:")
            folder_name = list_item[0:-4] + "_ame"
            folder_list.append(folder_name)
            tar.extractall(path = folder_name)
            tar.close()
            log_file(folder_name)
        else:
            print("File {} does not exist.".format(list_item))
            
    return folder_list
    

def cleanup_ame_ignored_files(folder_list):
    '''Removes unecessary files from the folder. 
    
    '''
    for folder in folder_list:
        file_list = os.listdir(folder)
        for file in file_list:
            if item.endswith(".results"):
                os.remove(item)
            if item.endswith(".exe"):
                os.remove(item)
    return 1


def git_add_ame_folders(folders):
    # Add *_ame folders to git stage
    for folder in folders:
        out = subprocess.Popen(['git', 'add', folder + '/'], stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
        stdout, stderr = out.communicate()
        # The -u will capture removed files?
        out = subprocess.Popen(['git', 'add', '-u', folder + '/'], stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
        stdout, stderr = out.communicate()
        
        log_file(stdout.decode('utf-8'))
    return 1
    
def remove_ame_from_staging(file_list):
    # Loop through any staged ame files.
    for file in file_list:
        out = subprocess.Popen(['git', 'rm', '--cached', file], stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
        stdout, stderr = out.communicate()
    return 1

def main(args=None):
    # if file name is *.ame
    # extract *.ame as a tar of the same name into a folder of the same name + _ame
    # delete .results file
    # don't commit .ame file 
    
    # Search for files we want to process in the staged list
    # These will only be *.ame files.
    staged_ame_files = get_staged_ame_files()
    if not staged_ame_files:
        # If its empty, there's nothing to do. End the function.
        return 0
    
    # We're not empty, lets extract each one.
    folder_list = extract_ame_files(staged_ame_files)
    
    # Delete all .results files in each extracted folder  path
    
    # Stage all files in each folder path 
    git_add_ame_folders(folder_list)
    
    # Unstage the .ame file
    remove_ame_from_staging(staged_ame_files)
    return 1

if __name__ == "__main__":
    args = parse_args()
    main(args)

und nach dem Auschecken

#!/usr/bin/env python

import argparse
import os
import tarfile
import zipfile
import subprocess
import shutil
#from shutil import rmtree # Delete directory trees

def parse_args():
    pass

def log_file(log_item):
    cwd = os.getcwd()
    file = open("MyFile2.txt", "a") # Open file in append mode
    file.write(log_item + '\n')
    return 1
    
def compress_ame_files(folder_list):
    for list_item in folder_list:
        log_file("We're on item {}".format(list_item))
        file_name = list_item[0:-4] + ".ame"
        log_file("Tar file name {}".format(file_name))
        # Delete the file if it exists first.
        os.remove(file_name)
        with tarfile.open(file_name, "w:") as tar:
            tar.add(list_item, arcname=os.path.basename('../'))
    return 1
    

def cleanup_ame_ignored_files(folder_list):
    '''Removes unecessary files from the folder. 
    
    '''
    for folder in folder_list:
        file_list = os.listdir(folder)
        for file in file_list:
            if item.endswith(".results"):
                os.remove(item)
            if item.endswith(".exe"):
                os.remove(item)
    return 1


def git_add_ame_folders(folders):
    # Add *_ame folders to git stage
    for folder in folders:
        out = subprocess.Popen(['git', 'add', folder + '/'], stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
        stdout, stderr = out.communicate()
        # The -u will capture removed files?
        out = subprocess.Popen(['git', 'add', '-u', folder + '/'], stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
        stdout, stderr = out.communicate()
        
        #log_file(stdout.decode('utf-8'))
    return 1
    
def remove_ame_from_staging(file_list):
    # Loop through any staged ame files.
    for file in file_list:
        out = subprocess.Popen(['git', 'rm', '--cached', file], stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
        stdout, stderr = out.communicate()
    return 1

def fast_scandir(dirname):
    # https://stackoverflow.com/questions/973473/getting-a-list-of-all-subdirectories-in-the-current-directory?rq=1
    subfolders= [f.path for f in os.scandir(dirname) if f.is_dir()]
    for dirname in list(subfolders):
        subfolders.extend(fast_scandir(dirname))
    return subfolders

def delete_ame_folders(folders):
    for folder in folders:
        try:
            shutil.rmtree(folder)
        except OSError as e:
            print("Error: %s : %s" % (dir_path, e.strerror))
    return 1
    
#def main(args=None):
def main(lines):
    print("Post checkout running.")
    # find folders with the name _ame
    #log_file("We're running.")
    folder_list = []
    for folder in fast_scandir(os.getcwd()):
        if folder.endswith("_ame"):
            #log_file("Found folder {}.".format(folder))
            folder_list.append(os.path.join(os.getcwd(), folder))
    # tar each folder up and rename with .ame
    compress_ame_files(folder_list)
    
    # Delete the folders
    #delete_ame_folders(folder_list)

    return 1

if __name__ == "__main__":
    args = parse_args()
    main(args)

Antworten

3 Rukie Dec 31 2020 at 00:46

Der Code in dieser Antwort implementiert einen Git-Filter im Gegensatz zu einem Pre-Commit-Hook und einem Post-Checkout-Hook in der Frage. Der Vorteil des Filters besteht darin, dass nur eine Datei bearbeitet wird. Zusätzliche Dateien müssen nicht separat verfolgt und festgeschrieben / abgerufen werden. Ähnlich wie bei Zippey wird ein unkomprimierter Datenstrom erstellt und unnötige Dateien auf dem Weg entfernt.

Hinweis: Verwenden Sie keine print-Anweisungen, da dies den stdout-Stream im Git-Filter beeinträchtigt. Dies war eine schmerzhafte Lektion.

Hinweis: CRLF- und LF-Endungen sind ein Problem. Beim Dekodieren vom ersten Git-Pull musste ich die Zeilenenden bereinigen, da Sourcetree / Git in das Windows-Format konvertiert wurde.

Diskussion der Lösung:

Da es sich bei der Datei, mit der ich arbeite, um einen unkomprimierten Teer handelt, wurde die Zippey-Lösung nicht direkt angewendet. Zippey ist nur für Zip-Dateien. Ich habe stattdessen Zippeys Technik mit TAR-Dateien implementiert.

Beim Festschreiben wird ein sauberer Filer angewendet, der die TAR-Datei "codiert". Die Codierungsfunktion nimmt jede Datei und zeichnet die Länge der Daten, die Rohlänge der Daten, wenn sie binär sind, den Speichermodus (ASCII oder binär) und den Dateinamen auf.

Das Codierungsskript überträgt alle Dateien in einer einzigen Datei mit demselben Namen in einem unkomprimierten Format. Binärdateien sind base64-codiert in einer einzelnen Zeile, wodurch Unterschiede leichter zu lesen sind.

Während der Codierung werden Dateien mit bestimmten Erweiterungen vermieden (wie Ergebnisdateien).

Beim Ziehen dekomprimiert ein Wischfilter die Datei, indem die vier Meta-Tags zum Lesen der Informationen verwendet werden. Jede Datei wird verarbeitet und einem TAR-Dateiobjekt hinzugefügt, und am Ende wird eine TAR-Datei ausgeschrieben.

Wie bei Zippey wird auf einem neuen Klon des Repositorys eine codierte Datei abgerufen, die für mein Tool nicht lesbar ist. Daher sucht Clone Setup nach meinen * .ame-Dateien, die codiert und decodiert sind, und richtet geeignete Git-Filter ein.

Da ich sowohl auf Linux- als auch auf Windows-Computern arbeite und Git dazu neigt, beim Auschecken CRLF hinzuzufügen, stellen die Skripte sicher, dass CRLF vor dem Codieren entfernt und CRLF vor dem Decodieren aus codierten Dateien entfernt wird.

amefilter.py

import tarfile
import sys
import io
import base64
import string
import tempfile
import os.path

DEBUG_AME_FILTER = False
NAME = 'Amesim_Git'
ENCODING = 'UTF-8'

W_EOL = b'\r\n'
U_EOL = b'\n'

# decompress these defined files
AME_EXTENSIONS = ['.amegp', '.cir', '.sad', '.units', '.views', '.xml']
ASCII_EXTENSIONS = ['.txt', '.py']
# Do not include these files in tracking. 
EXCLUDE = ['.results']

def debug(msg):
    '''Print debug message'''
    if DEBUG_AME_FILTER:
        sys.stderr.write('{0}: debug: {1}\n'.format(NAME, msg))

def error(msg):
    '''Print error message'''
    sys.stderr.write('{0}: error: {1}\n'.format(NAME, msg))

def init():
    '''Initialize writing; set binary mode for windows'''
    debug("Running on {}".format(sys.platform))
    if sys.platform.startswith('win'):
        import msvcrt
        debug("Enable Windows binary workaround")
        msvcrt.setmode(sys.stdin.fileno(), os.O_BINARY)
        msvcrt.setmode(sys.stdout.fileno(), os.O_BINARY)

def encode(input, output):
    '''Encode into special VCS friendly format from input to output'''
    debug("ENCODE was called")
    # Create a temporary file based off of the input AME file
    # This lets tarfile access a binary file object
    tfp = tempfile.TemporaryFile(mode='w+b')
    # Write contents into temporary file
    tfp.write(input.read())
    tfp.seek(0)  # Make sure tarfile reads from the start, otherwise object is empty
    tar = tarfile.open(fileobj=tfp, mode='r:')
    # Loop through objects within tar file
    for name in tar.getnames():
        # Get the file name of each object.
        tarinfo = tar.getmember(name)
        if tarinfo.isdir():
            continue # Skip folders, not sure how to handle encode/decode yet.
        data = tar.extractfile(name).read()
        
        # List of ASCII files to decode and version control
        text_extensions = list(set(AME_EXTENSIONS).union(set(ASCII_EXTENSIONS)))
        
        # Isolate extension.
        extension = os.path.splitext(name)[1][1:].strip().lower()
        # Amesim may store batched simulations as *.results.1, *.results.2, remove numeric endings and identify the real ending.
        if extension.isnumeric():
            root_name = os.path.splitext(name)[0][0:]
            real_extension = os.path.splitext(root_name)[1][1:].strip().lower()
            if real_extension in EXCLUDE:
                continue  # Skip excluded extensions
            
        if extension in EXCLUDE:
            continue  # Skip excluded extensions.
            
        # Encode the defined extensions in UTF-8
        try:
            # Check if text data
            data.decode(ENCODING)
            data = data.replace(W_EOL, U_EOL)  # Fix line endings
            try:
                strdata = map(chr, data)
            except TypeError:
                strdata = data
            if extension not in text_extensions and not all(c in string.printable for c in strdata):
                # File is not ascii, append binary file.
                raise UnicodeDecodeError(ENCODING, "".encode(ENCODING), 0, 1, "Artificial exception")

            # Encode
            debug("Appending text file '{}'".format(name))
            mode = 'A'  # ASCII Mode
            output.write("{}|{}|{}|{}\n".format(len(data), len(data), mode, name).encode(ENCODING))
            output.write(data)
            output.write("\n".encode(ENCODING)) # Separation from next meta line
        except UnicodeDecodeError:
            # Binary data
            debug("Appending binary file '{}'".format(name))
            mode = 'B'  # Binary Mode
            raw_len = len(data)
            data = base64.b64encode(data)
            output.write("{}|{}|{}|{}\n".format(len(data), raw_len, mode, name).encode(ENCODING))
            output.write(data)
            output.write("\n".encode(ENCODING))  # Separation from next meta line
    tar.close()

def decode(input, output):
    '''Decode from special VCS friendly format from input to output'''
    debug("DECODE was called")
    tfp = tempfile.TemporaryFile(mode='w+b')
    tar = tarfile.open(fileobj=tfp, mode='w:')
    #input = io.open(input, 'rb')
    while True:
        meta = input.readline().decode(ENCODING)
        if not meta:
            break
        #print(meta)
        (data_len, raw_len, mode, name) = [t(s) for (t, s) in zip((int, int, str, str), meta.split('|'))]
        #print('Data length:{}'.format(data_len))
        #print('Mode: {}'.format(mode))
        #print('Name: {}'.format(name))
        if mode == 'A':
            #print('Appending ascii data')
            debug("Appending text file '{}'".format(name))
            #https://stackoverflow.com/questions/740820/python-write-string-directly-to-tarfile
            info = tarfile.TarInfo(name=name.rstrip())
            info.size = raw_len
            raw_data = input.read(data_len)
            binary_data = io.BytesIO(raw_data)
            # Add each file object to our tarball
            tar.addfile(tarinfo=info, fileobj=binary_data)
            input.read(1) # Skip last '\n'
        elif mode == 'B':
            #print('Appending binary data')
            debug("Appending binary file '{}'".format(name.rstrip()))

            info = tarfile.TarInfo(name=name.rstrip())
            info.size = raw_len
            raw_data = input.read(data_len)
            decoded_data = base64.b64decode(raw_data)
            binary_data = io.BytesIO(decoded_data)
            tar.addfile(tarinfo=info, fileobj=binary_data)
            input.read(1) # Skip last '\n'
        else:
            # Should never reach here
            tar.close()
            tfp.close()
            error('Illegal mode "{}"'.format(mode))
            sys.exit(1)

    # Flush all writes
    tar.close()

    # Write output
    tfp.seek(0) # Go to the start of our temporary file
    output.write(tfp.read())
    tfp.close()

def main():
    '''Main program'''
    #import codecs
    #sys.stdout = codecs.getwriter('utf8')(sys.stdout)
    init()
    input = io.open(sys.stdin.fileno(), 'rb')
    output = io.open(sys.stdout.fileno(), 'wb')
    if len(sys.argv) < 2 or sys.argv[1] == '-' or sys.argv[1] == '--help':
        # This is wrong
        sys.stdout.write("{}\nTo encode: 'python ame_filter.py e'\nTo decode: 'python ame_filter.py d'\nAll files read from stdin and printed to stdout\n".format(NAME))
    elif sys.argv[1] == 'e':
        encode(input, output)
    elif sys.argv[1] == 'd':
        decode(input, output)
    else:
        error("Illegal argument '{}'. Try --help for more information".format(sys.argv[1]))
        sys.exit(1)

        
if __name__ == '__main__':
    main()

Clone_Setup.py

#!/usr/bin/env python
'''
Clone_Setup.py initializes the git environment. 
Each time a new instance of the repository is generated, these commands must 
be run.

'''
import os
import sys
import io
import subprocess
import ame_filter as amef
import tempfile
import shutil

# replacement strings
W_EOL = b'\r\n'
U_EOL = b'\n'

def setup_git():
    os.system("git config filter.ame_filter.smudge \"./ame_filter.py d\"")
    os.system("git config filter.ame_filter.clean \"./ame_filter.py e\"")
    
    '''
        Create .gitattributes programmatically. 
        Add these lines if they do not exist
    '''
    items = ["*.ame filter=ame_filter", "*.ame diff"]
    try:
        with open(".gitattributes", "x") as f:
            for item in items:
                f.write(item + "\n")
    except:
        with open(".gitattributes", "r+") as f:
            for item in items:
                f.seek(0)
                line_found = any(item in line for line in f)
                if not line_found:
                    f.seek(0, os.SEEK_END)
                    f.write("\n" + item)
    
    '''
        Create .gitignore programmatically. 
        Add these lines if they do not exist.
    '''
    items = ["*.gra",
             "*.res",
             "*.req",
             "*.pyc",
             "*.results",
             "*.results.*"
             ]
    
    try:
        with open(".gitignore", "x") as f:
            for item in items:
                f.write(item + "\n")
    except:
        with open(".gitignore", "r+") as f:
            for item in items:
                f.seek(0)
                line_found = any(item in line for line in f)
                if not line_found:
                    f.seek(0, os.SEEK_END)
                    f.write("\n" + item)
        

''' Search for AME files '''
def find_ame_files():
    out = subprocess.Popen(['git', 'ls-files'], stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
    stdout, stderr = out.communicate()
    # Separate output by newlines
    # filter for files with .ame 
    git_files = stdout.decode('utf-8').split('\n') # split as strings
    # Create list of *just* amesim files
    ame_files = [entry for entry in git_files if entry.endswith(".ame")]
    ''' #  Equivalent code block
    ame_files = []
    for entry in git_files:
        if entry.endswith(".ame"):
            ame_files.append(entry)
    '''
    return ame_files

def decode_ame_files(ame_files):
    for file in ame_files:
        input = io.open(file, 'rb')
        tfp = tempfile.TemporaryFile(mode='w+b')
        # Write contents into temporary file
        tfp.write(input.read().replace(W_EOL, U_EOL))
        tfp.seek(0)
        input.close()
        output = io.open(file+'~', 'wb')
        try:
            amef.decode(tfp, output)
            output.close()
            shutil.move(file+'~', file)
        except:
            print("File is already decoded. Returning to normal.")
            output.close()
        finally:
            os.remove(file+'~')
            
            

def main():
    '''Main program'''
    print("Setting up git.")
    setup_git()
    print("Finding ame files.")
    ame_files = find_ame_files()
    print(ame_files)
    print("Decoding ame files.")
    decode_ame_files(ame_files)
    
        
if __name__ == '__main__':
    main()
    # Keep console open to view messages on windows machines.
    if os.name == 'nt':
        input("Press enter to exit")