# afs.py # Easily extract, modify and repack the AFS archive used by Sora no Woto. # Usage: afs.py # Copyright (C) 2013 Shiz. # Released under the terms of the WTFPL: see http://www.wtfpl.net/txt/copying/ for details. import sys import os import os.path as path import struct import time import datetime import collections import pickle class AFSArchive: MAGIC = 0x00534641 BOUNDARY = 2048 SLOGAN = b'This file has been packed by afs.py, freely available from https://salty-salty-studios.com/shiz/code/afs.py. Have a nice day, alright?' def __init__(self, filename=None): self.verbose = False self.file_count = 0 self.meta_offset = 0 self.handle = None self.filename = None self.files = collections.OrderedDict() self.cache = collections.OrderedDict() self.original_order = [] if filename: self.open(filename) def open(self, filename): if self.handle: self.close() self.filename = filename self.handle = open(filename, 'rb') magic = struct.unpack('= i: category_files.insert(0, first) break order.append((category, first + 1)) # Obviously replace file by cached file. if category in self.cache and i in category_files: category_files.remove(i) order.append((category, i + 1)) # Write remaining category cache indexes. for i in category_files: order.append((category, i + 1)) # Write remaining cache indexes. for category, files in self.cache.items(): if category in self.files.keys(): continue for i in sorted(files.keys()): order.append((category, i + 1)) return order def save(self, filename, order=None): if filename == self.filename: raise ValueError('File name to save to can\'t be the same as already opened file name.') with open(filename, 'wb') as target: # Magic. target.write(struct.pack(' 1 def read(self, name, index=None): if self.is_directory(name): if index is None: raise ValueError('Can\'t read directory {} without knowing the file index to read.'.format(name)) elif index > len(self.files[name]) or index < 1: raise ValueError('Invalid file index to read from {}: {}.'.format(name, index)) if index is None: index = 1 # Return cached, modified data if available, else read directly from the archive. if name in self.cache and index - 1 in self.cache[name]: data, ftime = self.cache[name][index - 1] return data offset, size, ftime = self.files[name][index - 1] self.handle.seek(offset) return self.handle.read(size) def change(self, name, contents, index=None, time=None): if name not in self.cache: self.cache[name] = {} if index is None: index = 1 self.cache[name][index - 1] = contents, time or datetime.datetime.now() def add(self, name, contents, time=None): if name not in self.cache: self.cache[name] = {} self.cache[name][len(self.cache[name])] = contents, time or datetime.datetime.now() return len(self.cache[name]) def remove(self, name, index=None): if self.is_directory(name): if index is None: if name in self.cache: del self.cache[name] if name in self.files: del self.files[name] else: if name in self.cache and index - 1 in self.cache[name].keys(): del self.cache[name][index - 1] if name in self.files and index - 1 < len(self.cache[name]): del self.files[name][index - 1] else: if name in self.files: del self.files[name] if name in self.cache: del self.cache[name] def list(self): merged_list = [] for category, data in self.files.items(): if len(data) > 1: merged_list.extend('{}/{}'.format(category, x) for x in range(1, len(data) + 1)) else: merged_list.append(category) for category, data in self.cache.items(): l = len(data) if category in self.files.keys(): l += len(self.files[category]) if l > 1: for x in data.keys(): name = '{}/{}'.format(category, x) if name not in merged_list: merged_list.append(name) elif category not in merged_list: merged_list.append(category) return merged_list def extract(self, target_dir): try: os.makedirs(target_dir) except: pass def extract_from_handle(name, offset, size, ftime): self.log('Extracting {} (offset = {}, size = {}, time = {})'.format(name, offset, size, ftime)) target = path.join(target_dir, name) self.handle.seek(offset) with open(target, 'wb') as f: f.write(self.handle.read(size)) unixtime = time.mktime(ftime.timetuple()) os.utime(target, (unixtime, unixtime)) def extract_from_cache(name, contents, ftime): self.log('Extracting {} (time = {})'.format(name, ftime)) target = path.join(target_dir, name) with open(target, 'wb') as f: f.write(contents) unixtime = time.mktime(ftime.timetuple()) os.utime(target, (unixtime, unixtime)) for category, files in self.files.items(): if len(files) > 1: base = path.join(target_dir, category) try: os.makedirs(base) except: pass for i, (offset, size, ftime) in enumerate(files): if category in self.cache and i in self.cache[category].keys(): continue target = path.join(category, str(i + 1)) extract_from_handle(target, offset, size, ftime) else: if category in self.cache: continue offset, size, ftime = files[0] extract_from_handle(category, offset, size, ftime) for category, files in self.cache.items(): if len(files) > 1: base = path.join(target_dir, category) try: os.makedirs(base) except: pass for i, (contents, ftime) in enumerate(files): target = path.join(category, str(i + 1)) extract_from_cache(target, contents, ftime) else: contents, ftime = files[0] extract_from_cache(category, contents, ftime) def repack_data(self): return { 'order': self.original_order } def log(self, format, *args, **kwargs): if self.verbose: print(format.format(*args, **kwargs)) if __name__ == "__main__": if len(sys.argv) < 4: exit("Usage: {} FILE DESTDIR METAFILE".format(sys.argv[0])) arc = AFSArchive(sys.argv[1]) arc.extract(sys.argv[2]) pickle.dump(arc.repack_data(), sys.argv[3], protocol=3)