Git zip / tar file implementasi pra-komit dan pasca-checkout hooks

Dec 22 2020

Saya secara teratur menggunakan alat (Amesim) yang mengemas file-nya dalam file tar yang tidak terkompresi. Untuk pembuatan versi, saya biasanya menamai file sebagai file1_Rev01.ame, dan mengulanginya dengan perubahan. Ini berfungsi ketika saya satu-satunya pengguna, tetapi akhir-akhir ini saya berbagi file / model lebih teratur. Sulit untuk mencoba membagikan model ini, karena sering kali menyertakan hasil yang cukup besar (gbs data), dan melacak perubahan antar versi jika sulit, kecuali jika menambahkan teks secara ketat dalam model pada setiap perubahan. (Amesim adalah alat seperti Simulink.)

Saya telah membaca tentang git hooks dan git filter, tetapi saya tidak yakin apa yang harus dilakukan untuk mengelola versi tarball dengan lebih baik.

Katakanlah saya memiliki file "my_file.tar" dan terdiri dari a.txt, b.model, c.data, dan d.results.

Dari sisi aplikasi, saya akan menampilkan "my_file.tar" dan mengirimkan "Pembaruan ke model". Tanpa perubahan pada git, ini melacak perubahan ke file biner. Ini tidak dapat dibaca dan menghabiskan banyak ruang. Jika hasilnya disertakan, file tersebut cukup besar. Mengkloning repo akan menjadi tantangan jika hasilnya terus disimpan.

Untuk usaha pertama saya, saya mencoba menggunakan hook pra-komit dan pasca-checkout.

Saat commit, hook pra-commit saya untars "my_file.tar" ke dalam direktori "my_file_tar." Ini menghapus file * .results yang berasal dari menjalankan model. Ini tidak perlu untuk melacak ini dan menghemat ruang yang signifikan (gbs).

Ketika saya menarik modelnya, post-checkout akan mencari folder apa pun dengan _tar dan tar, mengganti namanya menjadi my_file.tar.

Sekarang umumnya ini berhasil. Tapi, bagaimana cara menangani folder my_file.tar dan tidak terkompresi? Jika saya menghapus otomatis folder yang tidak dikompresi setelah check-out, git menyatakan bahwa saya memiliki perubahan signifikan untuk dilacak. Apakah saya perlu menambah / menghapus folder ke .gitignore setiap saat? Selain itu, file tar tidak akan pernah menunjukkan bahwa itu dilacak, karena saya menghapusnya dalam kode pra-komit. Apa yang dapat saya lakukan untuk membersihkan proses ini? Bagaimana saya harus menangani ini secara berbeda?

Referensi:

  • File Zip Git
  • Noda dan Bersih
  • Dokumen Git Office
  • Zippey
  • XLTrail

Untuk kode ini, .ame adalah file tar.

pra-komit

#!/usr/bin/env python

import argparse
import os
import tarfile
import zipfile
import subprocess

def parse_args():
    pass

def log_file(log_item):
    cwd = os.getcwd()
    file = open("MyFile.txt", "a") # Open file in append mode
    file.write(log_item + '\n')
    return 1
    
def get_staged_ame_files():
    '''Request a list of staged files from git and return a list of *.ame files

    This function opens a subprocess with git, requests a list of names in the git staged list. It will return a list of strings.
    '''
    out = subprocess.Popen(['git', 'diff', '--staged', '--name-only'], stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
    stdout, stderr = out.communicate()
    # Separate output by newlines
    # staged_files = stdout.split(b'\n') # split as bytes
    
    # filter for files with .ame 
    staged_files = stdout.decode('utf-8').split('\n') # split as strings
    # Create list of *just* amesim files
    staged_ame_files = []
    for entry in staged_files:
        if entry.endswith(".ame"):
            staged_ame_files.append(entry)
    
    if not staged_ame_files:
        return None
    else:
        return staged_ame_files

def extract_ame_files(file_list):
    folder_list = []
    for list_item in file_list:
        # If file exists, extract it. Else continue.
        if os.path.isfile(list_item):
            tar = tarfile.open(list_item, "r:")
            folder_name = list_item[0:-4] + "_ame"
            folder_list.append(folder_name)
            tar.extractall(path = folder_name)
            tar.close()
            log_file(folder_name)
        else:
            print("File {} does not exist.".format(list_item))
            
    return folder_list
    

def cleanup_ame_ignored_files(folder_list):
    '''Removes unecessary files from the folder. 
    
    '''
    for folder in folder_list:
        file_list = os.listdir(folder)
        for file in file_list:
            if item.endswith(".results"):
                os.remove(item)
            if item.endswith(".exe"):
                os.remove(item)
    return 1


def git_add_ame_folders(folders):
    # Add *_ame folders to git stage
    for folder in folders:
        out = subprocess.Popen(['git', 'add', folder + '/'], stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
        stdout, stderr = out.communicate()
        # The -u will capture removed files?
        out = subprocess.Popen(['git', 'add', '-u', folder + '/'], stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
        stdout, stderr = out.communicate()
        
        log_file(stdout.decode('utf-8'))
    return 1
    
def remove_ame_from_staging(file_list):
    # Loop through any staged ame files.
    for file in file_list:
        out = subprocess.Popen(['git', 'rm', '--cached', file], stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
        stdout, stderr = out.communicate()
    return 1

def main(args=None):
    # if file name is *.ame
    # extract *.ame as a tar of the same name into a folder of the same name + _ame
    # delete .results file
    # don't commit .ame file 
    
    # Search for files we want to process in the staged list
    # These will only be *.ame files.
    staged_ame_files = get_staged_ame_files()
    if not staged_ame_files:
        # If its empty, there's nothing to do. End the function.
        return 0
    
    # We're not empty, lets extract each one.
    folder_list = extract_ame_files(staged_ame_files)
    
    # Delete all .results files in each extracted folder  path
    
    # Stage all files in each folder path 
    git_add_ame_folders(folder_list)
    
    # Unstage the .ame file
    remove_ame_from_staging(staged_ame_files)
    return 1

if __name__ == "__main__":
    args = parse_args()
    main(args)

dan setelah pembayaran

#!/usr/bin/env python

import argparse
import os
import tarfile
import zipfile
import subprocess
import shutil
#from shutil import rmtree # Delete directory trees

def parse_args():
    pass

def log_file(log_item):
    cwd = os.getcwd()
    file = open("MyFile2.txt", "a") # Open file in append mode
    file.write(log_item + '\n')
    return 1
    
def compress_ame_files(folder_list):
    for list_item in folder_list:
        log_file("We're on item {}".format(list_item))
        file_name = list_item[0:-4] + ".ame"
        log_file("Tar file name {}".format(file_name))
        # Delete the file if it exists first.
        os.remove(file_name)
        with tarfile.open(file_name, "w:") as tar:
            tar.add(list_item, arcname=os.path.basename('../'))
    return 1
    

def cleanup_ame_ignored_files(folder_list):
    '''Removes unecessary files from the folder. 
    
    '''
    for folder in folder_list:
        file_list = os.listdir(folder)
        for file in file_list:
            if item.endswith(".results"):
                os.remove(item)
            if item.endswith(".exe"):
                os.remove(item)
    return 1


def git_add_ame_folders(folders):
    # Add *_ame folders to git stage
    for folder in folders:
        out = subprocess.Popen(['git', 'add', folder + '/'], stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
        stdout, stderr = out.communicate()
        # The -u will capture removed files?
        out = subprocess.Popen(['git', 'add', '-u', folder + '/'], stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
        stdout, stderr = out.communicate()
        
        #log_file(stdout.decode('utf-8'))
    return 1
    
def remove_ame_from_staging(file_list):
    # Loop through any staged ame files.
    for file in file_list:
        out = subprocess.Popen(['git', 'rm', '--cached', file], stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
        stdout, stderr = out.communicate()
    return 1

def fast_scandir(dirname):
    # https://stackoverflow.com/questions/973473/getting-a-list-of-all-subdirectories-in-the-current-directory?rq=1
    subfolders= [f.path for f in os.scandir(dirname) if f.is_dir()]
    for dirname in list(subfolders):
        subfolders.extend(fast_scandir(dirname))
    return subfolders

def delete_ame_folders(folders):
    for folder in folders:
        try:
            shutil.rmtree(folder)
        except OSError as e:
            print("Error: %s : %s" % (dir_path, e.strerror))
    return 1
    
#def main(args=None):
def main(lines):
    print("Post checkout running.")
    # find folders with the name _ame
    #log_file("We're running.")
    folder_list = []
    for folder in fast_scandir(os.getcwd()):
        if folder.endswith("_ame"):
            #log_file("Found folder {}.".format(folder))
            folder_list.append(os.path.join(os.getcwd(), folder))
    # tar each folder up and rename with .ame
    compress_ame_files(folder_list)
    
    # Delete the folders
    #delete_ame_folders(folder_list)

    return 1

if __name__ == "__main__":
    args = parse_args()
    main(args)

Jawaban

3 Rukie Dec 31 2020 at 00:46

Kode dalam jawaban ini mengimplementasikan filter git sebagai lawan dari hook pra-komit dan hook pasca-pembayaran dalam pertanyaan. Keuntungan dari filter adalah hanya memanipulasi satu file. File tambahan tidak perlu dilacak dan dilakukan / ditarik secara terpisah. Sebaliknya, seperti Zippey, ini membuat aliran data yang tidak terkompresi dan menghapus file yang tidak perlu di sepanjang jalan.

Catatan: Jangan gunakan pernyataan cetak, karena akan mengacaukan aliran stdout di filter git. Ini adalah pelajaran yang menyakitkan.

Catatan: Akhiran CRLF dan LF merupakan masalah. Saat melakukan decoding dari git pull pertama, saya harus membersihkan ujung baris karena Sourcetree / Git dikonversi ke format windows.

Diskusi solusi:

Karena file yang saya kerjakan adalah tar yang tidak terkompresi, solusi Zippey tidak berlaku secara langsung. Zippey hanya untuk file zip. Saya menerapkan teknik Zippey dengan file tar sebagai gantinya.

Saat komit, filer bersih diterapkan yang 'menyandikan' file tar. Fungsi encode mengambil setiap file dan mencatat panjang data, panjang data mentah jika biner, mode penyimpanan (ascii atau biner), dan nama file.

Skrip encode mengalirkan semua file ke dalam satu file dengan nama yang sama dalam format yang tidak terkompresi. File biner adalah base64 yang dikodekan menjadi satu baris, membuat diff lebih mudah dibaca.

Selama encode, file dengan ekstensi tertentu dihindari (seperti file hasil).

Saat ditarik, filter noda mendekompresi file dengan memanfaatkan empat tag meta untuk membaca informasi. Setiap file diproses dan ditambahkan ke objek file tar, dan pada akhirnya file tar ditulis.

Seperti Zippey, pada klon baru dari repositori, file yang disandikan ditarik yang tidak dapat dibaca oleh alat saya. Jadi Clone Setup mencari file * .ame saya yang dikodekan dan mendekodekannya, serta menyiapkan filter git yang sesuai.

Saat saya bekerja pada mesin linux dan windows, dan git memiliki kecenderungan untuk menambahkan CRLF saat checkout, skrip memastikan untuk menghapus CRLF sebelum encoding, dan menghapus CRLF dari file yang disandikan sebelum decoding.

amefilter.py

import tarfile
import sys
import io
import base64
import string
import tempfile
import os.path

DEBUG_AME_FILTER = False
NAME = 'Amesim_Git'
ENCODING = 'UTF-8'

W_EOL = b'\r\n'
U_EOL = b'\n'

# decompress these defined files
AME_EXTENSIONS = ['.amegp', '.cir', '.sad', '.units', '.views', '.xml']
ASCII_EXTENSIONS = ['.txt', '.py']
# Do not include these files in tracking. 
EXCLUDE = ['.results']

def debug(msg):
    '''Print debug message'''
    if DEBUG_AME_FILTER:
        sys.stderr.write('{0}: debug: {1}\n'.format(NAME, msg))

def error(msg):
    '''Print error message'''
    sys.stderr.write('{0}: error: {1}\n'.format(NAME, msg))

def init():
    '''Initialize writing; set binary mode for windows'''
    debug("Running on {}".format(sys.platform))
    if sys.platform.startswith('win'):
        import msvcrt
        debug("Enable Windows binary workaround")
        msvcrt.setmode(sys.stdin.fileno(), os.O_BINARY)
        msvcrt.setmode(sys.stdout.fileno(), os.O_BINARY)

def encode(input, output):
    '''Encode into special VCS friendly format from input to output'''
    debug("ENCODE was called")
    # Create a temporary file based off of the input AME file
    # This lets tarfile access a binary file object
    tfp = tempfile.TemporaryFile(mode='w+b')
    # Write contents into temporary file
    tfp.write(input.read())
    tfp.seek(0)  # Make sure tarfile reads from the start, otherwise object is empty
    tar = tarfile.open(fileobj=tfp, mode='r:')
    # Loop through objects within tar file
    for name in tar.getnames():
        # Get the file name of each object.
        tarinfo = tar.getmember(name)
        if tarinfo.isdir():
            continue # Skip folders, not sure how to handle encode/decode yet.
        data = tar.extractfile(name).read()
        
        # List of ASCII files to decode and version control
        text_extensions = list(set(AME_EXTENSIONS).union(set(ASCII_EXTENSIONS)))
        
        # Isolate extension.
        extension = os.path.splitext(name)[1][1:].strip().lower()
        # Amesim may store batched simulations as *.results.1, *.results.2, remove numeric endings and identify the real ending.
        if extension.isnumeric():
            root_name = os.path.splitext(name)[0][0:]
            real_extension = os.path.splitext(root_name)[1][1:].strip().lower()
            if real_extension in EXCLUDE:
                continue  # Skip excluded extensions
            
        if extension in EXCLUDE:
            continue  # Skip excluded extensions.
            
        # Encode the defined extensions in UTF-8
        try:
            # Check if text data
            data.decode(ENCODING)
            data = data.replace(W_EOL, U_EOL)  # Fix line endings
            try:
                strdata = map(chr, data)
            except TypeError:
                strdata = data
            if extension not in text_extensions and not all(c in string.printable for c in strdata):
                # File is not ascii, append binary file.
                raise UnicodeDecodeError(ENCODING, "".encode(ENCODING), 0, 1, "Artificial exception")

            # Encode
            debug("Appending text file '{}'".format(name))
            mode = 'A'  # ASCII Mode
            output.write("{}|{}|{}|{}\n".format(len(data), len(data), mode, name).encode(ENCODING))
            output.write(data)
            output.write("\n".encode(ENCODING)) # Separation from next meta line
        except UnicodeDecodeError:
            # Binary data
            debug("Appending binary file '{}'".format(name))
            mode = 'B'  # Binary Mode
            raw_len = len(data)
            data = base64.b64encode(data)
            output.write("{}|{}|{}|{}\n".format(len(data), raw_len, mode, name).encode(ENCODING))
            output.write(data)
            output.write("\n".encode(ENCODING))  # Separation from next meta line
    tar.close()

def decode(input, output):
    '''Decode from special VCS friendly format from input to output'''
    debug("DECODE was called")
    tfp = tempfile.TemporaryFile(mode='w+b')
    tar = tarfile.open(fileobj=tfp, mode='w:')
    #input = io.open(input, 'rb')
    while True:
        meta = input.readline().decode(ENCODING)
        if not meta:
            break
        #print(meta)
        (data_len, raw_len, mode, name) = [t(s) for (t, s) in zip((int, int, str, str), meta.split('|'))]
        #print('Data length:{}'.format(data_len))
        #print('Mode: {}'.format(mode))
        #print('Name: {}'.format(name))
        if mode == 'A':
            #print('Appending ascii data')
            debug("Appending text file '{}'".format(name))
            #https://stackoverflow.com/questions/740820/python-write-string-directly-to-tarfile
            info = tarfile.TarInfo(name=name.rstrip())
            info.size = raw_len
            raw_data = input.read(data_len)
            binary_data = io.BytesIO(raw_data)
            # Add each file object to our tarball
            tar.addfile(tarinfo=info, fileobj=binary_data)
            input.read(1) # Skip last '\n'
        elif mode == 'B':
            #print('Appending binary data')
            debug("Appending binary file '{}'".format(name.rstrip()))

            info = tarfile.TarInfo(name=name.rstrip())
            info.size = raw_len
            raw_data = input.read(data_len)
            decoded_data = base64.b64decode(raw_data)
            binary_data = io.BytesIO(decoded_data)
            tar.addfile(tarinfo=info, fileobj=binary_data)
            input.read(1) # Skip last '\n'
        else:
            # Should never reach here
            tar.close()
            tfp.close()
            error('Illegal mode "{}"'.format(mode))
            sys.exit(1)

    # Flush all writes
    tar.close()

    # Write output
    tfp.seek(0) # Go to the start of our temporary file
    output.write(tfp.read())
    tfp.close()

def main():
    '''Main program'''
    #import codecs
    #sys.stdout = codecs.getwriter('utf8')(sys.stdout)
    init()
    input = io.open(sys.stdin.fileno(), 'rb')
    output = io.open(sys.stdout.fileno(), 'wb')
    if len(sys.argv) < 2 or sys.argv[1] == '-' or sys.argv[1] == '--help':
        # This is wrong
        sys.stdout.write("{}\nTo encode: 'python ame_filter.py e'\nTo decode: 'python ame_filter.py d'\nAll files read from stdin and printed to stdout\n".format(NAME))
    elif sys.argv[1] == 'e':
        encode(input, output)
    elif sys.argv[1] == 'd':
        decode(input, output)
    else:
        error("Illegal argument '{}'. Try --help for more information".format(sys.argv[1]))
        sys.exit(1)

        
if __name__ == '__main__':
    main()

Clone_Setup.py

#!/usr/bin/env python
'''
Clone_Setup.py initializes the git environment. 
Each time a new instance of the repository is generated, these commands must 
be run.

'''
import os
import sys
import io
import subprocess
import ame_filter as amef
import tempfile
import shutil

# replacement strings
W_EOL = b'\r\n'
U_EOL = b'\n'

def setup_git():
    os.system("git config filter.ame_filter.smudge \"./ame_filter.py d\"")
    os.system("git config filter.ame_filter.clean \"./ame_filter.py e\"")
    
    '''
        Create .gitattributes programmatically. 
        Add these lines if they do not exist
    '''
    items = ["*.ame filter=ame_filter", "*.ame diff"]
    try:
        with open(".gitattributes", "x") as f:
            for item in items:
                f.write(item + "\n")
    except:
        with open(".gitattributes", "r+") as f:
            for item in items:
                f.seek(0)
                line_found = any(item in line for line in f)
                if not line_found:
                    f.seek(0, os.SEEK_END)
                    f.write("\n" + item)
    
    '''
        Create .gitignore programmatically. 
        Add these lines if they do not exist.
    '''
    items = ["*.gra",
             "*.res",
             "*.req",
             "*.pyc",
             "*.results",
             "*.results.*"
             ]
    
    try:
        with open(".gitignore", "x") as f:
            for item in items:
                f.write(item + "\n")
    except:
        with open(".gitignore", "r+") as f:
            for item in items:
                f.seek(0)
                line_found = any(item in line for line in f)
                if not line_found:
                    f.seek(0, os.SEEK_END)
                    f.write("\n" + item)
        

''' Search for AME files '''
def find_ame_files():
    out = subprocess.Popen(['git', 'ls-files'], stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
    stdout, stderr = out.communicate()
    # Separate output by newlines
    # filter for files with .ame 
    git_files = stdout.decode('utf-8').split('\n') # split as strings
    # Create list of *just* amesim files
    ame_files = [entry for entry in git_files if entry.endswith(".ame")]
    ''' #  Equivalent code block
    ame_files = []
    for entry in git_files:
        if entry.endswith(".ame"):
            ame_files.append(entry)
    '''
    return ame_files

def decode_ame_files(ame_files):
    for file in ame_files:
        input = io.open(file, 'rb')
        tfp = tempfile.TemporaryFile(mode='w+b')
        # Write contents into temporary file
        tfp.write(input.read().replace(W_EOL, U_EOL))
        tfp.seek(0)
        input.close()
        output = io.open(file+'~', 'wb')
        try:
            amef.decode(tfp, output)
            output.close()
            shutil.move(file+'~', file)
        except:
            print("File is already decoded. Returning to normal.")
            output.close()
        finally:
            os.remove(file+'~')
            
            

def main():
    '''Main program'''
    print("Setting up git.")
    setup_git()
    print("Finding ame files.")
    ame_files = find_ame_files()
    print(ame_files)
    print("Decoding ame files.")
    decode_ame_files(ame_files)
    
        
if __name__ == '__main__':
    main()
    # Keep console open to view messages on windows machines.
    if os.name == 'nt':
        input("Press enter to exit")