aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorReiner Herrmann <reiner@reiner-h.de>2017-08-30 12:45:33 +0200
committerReiner Herrmann <reiner@reiner-h.de>2017-08-30 12:45:33 +0200
commitdd19c48a1a6f5483f7a6f8436f08bc2988c149bf (patch)
tree6373e01c8002c4bbeb1b2bc493082aeb7c466b3f
parenta91d06ba3ef3e2eb3cc8d122bc44094330b30357 (diff)
Refactor device handling into class
-rwxr-xr-xloopertrx.py244
1 files changed, 126 insertions, 118 deletions
diff --git a/loopertrx.py b/loopertrx.py
index 4401e8a..cbbef1e 100755
--- a/loopertrx.py
+++ b/loopertrx.py
@@ -30,132 +30,140 @@ ENDPOINT_OUT = 0x01
COMMAND_SIZE = 0xfe
COMMAND_DATA = 0xff
-def random_tag():
- return random.randint(0, 1<<32 - 1)
-
-def mass_storage_header(data_len, cdb_len, tag=None):
- header = "USBC".encode('ascii')
- if not tag:
- tag = random_tag()
- flags = 0x80
- target = 0x00
- header += struct.pack('<iiBBB', tag, data_len, flags, target, cdb_len)
- return header
-
-def command_header(command, data_len, flag1, flag2, tag=None):
- cdb = bytes([0xcb, command, flag1, 0x00, flag2, 0x00, 0x00, 0x00, 0x00, 0x00])
- header = mass_storage_header(data_len, len(cdb), tag) + cdb
- header += bytes([0x00 for padding in range(31-len(header))])
- return header
-
-def ack_data(dev):
- dev.read(ENDPOINT_IN, 32)
-
-def get_size(dev):
- header = command_header(COMMAND_SIZE, 5, 0x00, 0x01)
- dev.write(ENDPOINT_OUT, header)
- length = dev.read(ENDPOINT_IN, 100)
- ack_data(dev)
- if length[0] == 1:
- return 0
- return length[1] + (length[2] << 8) + (length[3] << 16) + (length[4] << 24)
-
-def submit_data_len(dev, size, tag=None):
- header = command_header(COMMAND_SIZE, 5, 0x00, 0x00, tag)
- data_size = struct.pack('<bi', 0x00, size)
- dev.write(ENDPOINT_OUT, header)
- dev.write(ENDPOINT_OUT, data_size)
- ack_data(dev)
-
-def get_data(dev, nbytes):
- header = command_header(COMMAND_DATA, nbytes, 0x01, 0x01)
- dev.write(ENDPOINT_OUT, header)
- buf = dev.read(ENDPOINT_IN, nbytes)
- ack_data(dev)
- return buf
-
-def send_data(dev, data, tag=None):
- header = command_header(COMMAND_DATA, len(data), 0x01, 0x00, tag)
- dev.write(ENDPOINT_OUT, header)
- dev.write(ENDPOINT_OUT, data)
- ack_data(dev)
-
-def write_wav_header(outfile, data_size):
- header_size = 44
- header = "RIFF".encode('ascii')
- header += struct.pack('<i', data_size + header_size - 8)
- header += "WAVE".encode('ascii')
- header += "fmt ".encode('ascii')
- fmt = 0x01 # PCM
- nchan = 1
- rate = 48000
- fsize = 3
- bps = rate * fsize
- bits = 24
- header += struct.pack('<ihhiihh', 16, fmt, nchan, rate, bps, fsize, bits)
- header += "data".encode('ascii')
- header += struct.pack('<i', data_size)
- outfile.write(header)
-
-def init_device():
- dev = usb.core.find(idVendor=LOOPER_VID, idProduct=LOOPER_PID)
- if not dev:
- print("Device not found.")
- sys.exit(1)
- if dev.is_kernel_driver_active(0):
- dev.detach_kernel_driver(0)
- dev.set_configuration()
- return dev
-
-def receive_file(dev, filename):
- size = get_size(dev)
- if size == 0:
- print("No data available.")
- sys.exit(0)
-
- with open(filename, 'wb') as outfile:
- write_wav_header(outfile, size)
- print("Receiving ", end='', flush=True)
- while size > 0:
- bufsize = (size >= 65536) and 65536 or size
- # data needs to be transferred in multiples of 1k blocks
- padding = (1024 - (bufsize % 1024)) % 1024
-
- buf = get_data(dev, bufsize + padding)
- print('.', end='', flush=True),
- outfile.write(buf[:bufsize])
- size -= bufsize
- print(" Done.")
-
-def transmit_file(dev, filename):
- with open(filename, 'rb') as infile:
- content = infile.read()
- tag = random_tag()
- # skip first 44 bytes for now; we assume valid file. TODO: validate
- content = content[44:]
- content_size = len(content)
- print("Transmitting ", end='', flush=True)
- while len(content) > 0:
- buf = content[:65536]
- padsize = (1024 - (len(buf) % 1024)) % 1024
- buf += b'\x00' * padsize
-
- send_data(dev, buf, tag)
- print('.', end='', flush=True),
- content = content[65536:]
- submit_data_len(dev, content_size, tag)
- print(" Done.")
+
+class USBLooper():
+ def __init__(self):
+ self.dev = usb.core.find(idVendor=LOOPER_VID, idProduct=LOOPER_PID)
+ if not self.dev:
+ raise FileNotFoundError("Device not found.")
+ if self.dev.is_kernel_driver_active(0):
+ self.dev.detach_kernel_driver(0)
+ self.dev.set_configuration()
+
+ def random_tag(self):
+ return random.randint(0, 1 << 32 - 1)
+
+ def mass_storage_header(self, data_len, cdb_len, tag=None):
+ header = "USBC".encode('ascii')
+ if not tag:
+ tag = self.random_tag()
+ flags = 0x80
+ target = 0x00
+ header += struct.pack('<iiBBB', tag, data_len, flags, target, cdb_len)
+ return header
+
+ def command_header(self, command, data_len, flag1, flag2, tag=None):
+ cdb = bytes([0xcb, command, flag1, 0x00, flag2, 0x00, 0x00, 0x00, 0x00, 0x00])
+ header = self.mass_storage_header(data_len, len(cdb), tag) + cdb
+ header += bytes([0x00 for padding in range(31-len(header))])
+ return header
+
+ def ack_data(self):
+ self.dev.read(ENDPOINT_IN, 32)
+
+ def get_size(self):
+ header = self.command_header(COMMAND_SIZE, 5, 0x00, 0x01)
+ self.dev.write(ENDPOINT_OUT, header)
+ length = self.dev.read(ENDPOINT_IN, 100)
+ self.ack_data()
+ if length[0] == 1:
+ return 0
+ return length[1] + (length[2] << 8) + (length[3] << 16) + (length[4] << 24)
+
+ def submit_data_len(self, size, tag=None):
+ header = self.command_header(COMMAND_SIZE, 5, 0x00, 0x00, tag)
+ data_size = struct.pack('<bi', 0x00, size)
+ self.dev.write(ENDPOINT_OUT, header)
+ self.dev.write(ENDPOINT_OUT, data_size)
+ self.ack_data()
+
+ def get_data(self, nbytes):
+ header = self.command_header(COMMAND_DATA, nbytes, 0x01, 0x01)
+ self.dev.write(ENDPOINT_OUT, header)
+ buf = self.dev.read(ENDPOINT_IN, nbytes)
+ self.ack_data()
+ return buf
+
+ def send_data(self, data, tag=None):
+ header = self.command_header(COMMAND_DATA, len(data), 0x01, 0x00, tag)
+ self.dev.write(ENDPOINT_OUT, header)
+ self.dev.write(ENDPOINT_OUT, data)
+ self.ack_data()
+
+ def write_wav_header(self, outfile, data_size):
+ header_size = 44
+ header = "RIFF".encode('ascii')
+ header += struct.pack('<i', data_size + header_size - 8)
+ header += "WAVE".encode('ascii')
+ header += "fmt ".encode('ascii')
+ fmt = 0x01 # PCM
+ nchan = 1
+ rate = 48000
+ fsize = 3
+ bps = rate * fsize
+ bits = 24
+ header += struct.pack('<ihhiihh', 16, fmt, nchan, rate, bps, fsize, bits)
+ header += "data".encode('ascii')
+ header += struct.pack('<i', data_size)
+ outfile.write(header)
+
+ def receive_file(self, filename):
+ size = self.get_size()
+ if size == 0:
+ print("No data available.")
+ sys.exit(0)
+
+ with open(filename, 'wb') as outfile:
+ self.write_wav_header(outfile, size)
+ print("Receiving ", end='', flush=True)
+ while size > 0:
+ bufsize = (size >= 65536) and 65536 or size
+ # data needs to be transferred in multiples of 1k blocks
+ padding = (1024 - (bufsize % 1024)) % 1024
+
+ buf = self.get_data(bufsize + padding)
+ print('.', end='', flush=True),
+ outfile.write(buf[:bufsize])
+ size -= bufsize
+ print(" Done.")
+
+ def transmit_file(self, filename):
+ with open(filename, 'rb') as infile:
+ content = infile.read()
+ tag = self.random_tag()
+ # skip first 44 bytes for now; we assume valid file. TODO: validate
+ content = content[44:]
+ content_size = len(content)
+ print("Transmitting ", end='', flush=True)
+ while len(content) > 0:
+ buf = content[:65536]
+ padsize = (1024 - (len(buf) % 1024)) % 1024
+ buf += b'\x00' * padsize
+
+ self.send_data(buf, tag)
+ print('.', end='', flush=True),
+ content = content[65536:]
+ self.submit_data_len(content_size, tag)
+ print(" Done.")
+
def main():
argp = argparse.ArgumentParser()
argp.add_argument('action', choices=['rx', 'tx'])
argp.add_argument('filename')
args = argp.parse_args()
- dev = init_device()
+
+ try:
+ dev = USBLooper()
+ except FileNotFoundError as e:
+ print(e)
+ sys.exit(1)
+
if args.action == 'rx':
- receive_file(dev, args.filename)
+ dev.receive_file(args.filename)
elif args.action == 'tx':
- transmit_file(dev, args.filename)
+ dev.transmit_file(args.filename)
+
if __name__ == "__main__":
main()