From dd19c48a1a6f5483f7a6f8436f08bc2988c149bf Mon Sep 17 00:00:00 2001 From: Reiner Herrmann Date: Wed, 30 Aug 2017 12:45:33 +0200 Subject: Refactor device handling into class --- loopertrx.py | 244 ++++++++++++++++++++++++++++++----------------------------- 1 file changed, 126 insertions(+), 118 deletions(-) (limited to 'loopertrx.py') diff --git a/loopertrx.py b/loopertrx.py index 4401e8a..cbbef1e 100755 --- a/loopertrx.py +++ b/loopertrx.py @@ -30,132 +30,140 @@ ENDPOINT_OUT = 0x01 COMMAND_SIZE = 0xfe COMMAND_DATA = 0xff -def random_tag(): - return random.randint(0, 1<<32 - 1) - -def mass_storage_header(data_len, cdb_len, tag=None): - header = "USBC".encode('ascii') - if not tag: - tag = random_tag() - flags = 0x80 - target = 0x00 - header += struct.pack(' 0: - bufsize = (size >= 65536) and 65536 or size - # data needs to be transferred in multiples of 1k blocks - padding = (1024 - (bufsize % 1024)) % 1024 - - buf = get_data(dev, bufsize + padding) - print('.', end='', flush=True), - outfile.write(buf[:bufsize]) - size -= bufsize - print(" Done.") - -def transmit_file(dev, filename): - with open(filename, 'rb') as infile: - content = infile.read() - tag = random_tag() - # skip first 44 bytes for now; we assume valid file. TODO: validate - content = content[44:] - content_size = len(content) - print("Transmitting ", end='', flush=True) - while len(content) > 0: - buf = content[:65536] - padsize = (1024 - (len(buf) % 1024)) % 1024 - buf += b'\x00' * padsize - - send_data(dev, buf, tag) - print('.', end='', flush=True), - content = content[65536:] - submit_data_len(dev, content_size, tag) - print(" Done.") + +class USBLooper(): + def __init__(self): + self.dev = usb.core.find(idVendor=LOOPER_VID, idProduct=LOOPER_PID) + if not self.dev: + raise FileNotFoundError("Device not found.") + if self.dev.is_kernel_driver_active(0): + self.dev.detach_kernel_driver(0) + self.dev.set_configuration() + + def random_tag(self): + return random.randint(0, 1 << 32 - 1) + + def mass_storage_header(self, data_len, cdb_len, tag=None): + header = "USBC".encode('ascii') + if not tag: + tag = self.random_tag() + flags = 0x80 + target = 0x00 + header += struct.pack(' 0: + bufsize = (size >= 65536) and 65536 or size + # data needs to be transferred in multiples of 1k blocks + padding = (1024 - (bufsize % 1024)) % 1024 + + buf = self.get_data(bufsize + padding) + print('.', end='', flush=True), + outfile.write(buf[:bufsize]) + size -= bufsize + print(" Done.") + + def transmit_file(self, filename): + with open(filename, 'rb') as infile: + content = infile.read() + tag = self.random_tag() + # skip first 44 bytes for now; we assume valid file. TODO: validate + content = content[44:] + content_size = len(content) + print("Transmitting ", end='', flush=True) + while len(content) > 0: + buf = content[:65536] + padsize = (1024 - (len(buf) % 1024)) % 1024 + buf += b'\x00' * padsize + + self.send_data(buf, tag) + print('.', end='', flush=True), + content = content[65536:] + self.submit_data_len(content_size, tag) + print(" Done.") + def main(): argp = argparse.ArgumentParser() argp.add_argument('action', choices=['rx', 'tx']) argp.add_argument('filename') args = argp.parse_args() - dev = init_device() + + try: + dev = USBLooper() + except FileNotFoundError as e: + print(e) + sys.exit(1) + if args.action == 'rx': - receive_file(dev, args.filename) + dev.receive_file(args.filename) elif args.action == 'tx': - transmit_file(dev, args.filename) + dev.transmit_file(args.filename) + if __name__ == "__main__": main() -- cgit v1.2.3