Flash Programmer Client

This is a co-project to my Arduino Parallel Flash Programmer. It can be used to upload, download and verify files in .bin and a special hex format. Furthermore, it incorporates a manual mode which lets you to edit the flash byte by byte or in 256 byte blocks. The program is written in python using pySerial but can also be downloaded as standalone .exe version which was compiled using PyInstaller. If you are interested in the source code, there is a code listing down below.

# Title       : Flash Programmer PC Client
# Author      : R.Lux
# Version     : 1.0
# Last edited : 01.10.2018

import sys
import serial
import time
import math

global data_buffer
global device_buffer
global flash_id

flash_id = 0
bytes_in_flash = 0
programmer_id = b'\x01FLASHPROG. V1.00'
CRC = 0
data_buffer = [0xFF] * 524288
device_buffer = [0xFF] * 256



def byte(number, i):
   
    return (number & (0xff << (i * 8))) >> (i * 8)



def display_help():
   
    print("   Flash Programmer CLI Tool  ")
    print("   usage: programmer_cli PORT ")
    print("   -m:i/w/r/wl/rl/e/el:A:X...X")
    print("     i  - get Device ID (ignore A and X")
    print("     w  - write byte X to address A ")
    print("     r  - read byte from address A and return byte")
    print("     wl - write 256 bytes to address A (in this case only the upper 16 bits of A are used)")
    print("     rl - read 256 bytes from address A (in this case only the upper 16 bits of A are used)")
    print("     e  - erase sector in which the address A is located")
    print("     el - erase whole device")
    print("   -f:w/r/v:FILENAME:FILE_OFFSET ")
    print("     w  - write from file to device")
    print("     r  - write from device to file")
    print("     v  - compare file with device")
    print(" (all arguments as hex with 0x preamble)")
    print(" (A = 24 bit, X space seperated and 8 bits wide)")


def open_port(portname):
   
    global ser
    ser = serial.Serial(portname, 115200, timeout=1)
    time.sleep(2)
    # Check if programmer is attached to this port
    ser.reset_input_buffer()
    ser.flush()
    try_index = 0
    while try_index < 4:
        ser.write(b'\x01')
        response = ser.read(18)
        ser.flush()
        if response == programmer_id:
            print("Found device on port " + portname)
            return
        try_index += 1
       
    ser.close()
    raise Exception("Error: Flash programmer wasnt found on serial port " + portname)
   
   
   
def upload_file(filename, offset):
   
    load_buffer(filename, offset)
    write_data_buffer_device()


def download_file(filename, offset):
   
    read_data_buffer_device()
    save_buffer(filename, offset)

   
def verify_file(filename, offset):
   
    load_buffer(filename, offset)
    verify_data_buffer()

def load_buffer(filename, offset):
    global data_buffer

    if filename.split(".")[1] == "shex":
        # We are in simple hex mode
        line_pointer = 0
        load_file = open(filename)
        while line_pointer != 32766:
            line = load_file.readline()
            line = line.replace('\n', '')
            if line == "":
                # Seems like we reached the end of the file
                break
            line = line.split(':')
            address_part = line[0]
            data_part = line[1]
            line_address = int(address_part, 16)
            if (line_address - offset) >= 0:
                # We have reached the offset
                data_part = data_part.split(" ")
   
                # Transfer all 16 bytes to the data buffer
                byte_pointer = 0
                while True:
                    data_buffer[(line_address) + byte_pointer] = int(data_part[(byte_pointer + 1)], 16)
                    byte_pointer += 1
                    if byte_pointer == 16:
                        # We transferred all 16 bytes
                        break
                line_pointer += 1
        load_file.close()
       
    elif filename.split(".")[1] == "bin":
        load_file = open(filename, "rb")
        for data_pointer in range(0, 524288):
            data_buffer[data_pointer] = int.from_bytes(load_file.read(1), byteorder='little')  
        load_file.close()


    elif filename.split(".")[1] == "hex":
        pass
       
    else:
        raise Exception("Filetype " + filename.split(".")[1] + " isnt supported!")
   

def save_buffer(filename, offset):
    if filename.split(".")[1] == "shex":
        save_file = open(filename, "w")
        data_pointer = 0
        while(data_pointer != 524288):
            # Write line address
            save_file.write("%06X:" % int(data_pointer + offset))
            while True:
                save_file.write(" %02X" % data_buffer[data_pointer])
                data_pointer += 1
                if(data_pointer%16 == 0):
                    break
            save_file.write('\n')
        save_file.close()

    elif filename.split(".")[1] == "bin":
        save_file = open(filename, "wb")
        #for data_pointer in range(0, 524288):
        save_file.write(bytes(data_buffer))
        save_file.close()

    else:
        raise Exception("Filetype " + filename.split(".")[1] + " isnt supported!")
       

       
def get_chip_id():
    global flash_id

    print("Reading flash ID...", end='')
    ser.write(b'\x02')
    ser.flush()
    flash_id = ser.read(2)[1]

    print("ID is " + hex(flash_id))


def calculate_device_buffer_CRC():
    global CRC
   
    CRC = 0
    for crc_pointer in range(0, 256):
        CRC = device_buffer[crc_pointer] + CRC
       

   
def check_for_flash_size():
    global bytes_in_flash
   
    if flash_id == 0xB7:
        bytes_in_flash = 2**19
    elif flash_id == 0xB6:
        bytes_in_flash = 2**18
    elif flash_id == 0xB5:
        bytes_in_flash = 2**17

       
    else:
        raise Exception("Device isnt supported (Device Cocde: " + hex(flash_id) + ")")


def read_data_buffer_device():
    global data_buffer

    print("\nReading...\n0%                                  100%\n")
    progress_len = bytes_in_flash/40
   
    start_time = time.time()
    data_buffer_pointer = 0
    while data_buffer_pointer < bytes_in_flash:
        # Get Bytes from device
        read_device_buffer_device(byte(data_buffer_pointer, 2), byte(data_buffer_pointer, 1))
       
        # Copy from device buffer to the data buffer
        for x in range(0, 256):
           data_buffer[(data_buffer_pointer + x)] = device_buffer[x]
           if (((data_buffer_pointer + x))%progress_len)//1 == 0:
            print("=", end='', flush=True)
        data_buffer_pointer += 256

    print("")

    print("Read " + str(bytes_in_flash) + " Bytes in " + str(round(time.time()-start_time, 2)) + " s.")
    print("Byterate: " + str(round((bytes_in_flash/(time.time()-start_time))/1024, 1)) + "KByte/s.")
   
def write_data_buffer_device():
    global device_buffer


    print("\nWriting...\n0%                                  100%\n")
    progress_len = bytes_in_flash/40
   
    start_time = time.time()
    data_buffer_pointer = 0
    while data_buffer_pointer < bytes_in_flash:
        # Copy from data buffer to device buffer
        for x in range(0, 256):
            device_buffer[x] = data_buffer[(data_buffer_pointer + x)]
            if (((data_buffer_pointer + x))%progress_len)//1 == 0:
                print("=", end='', flush=True)

        write_device_buffer_device(byte(data_buffer_pointer, 2), byte(data_buffer_pointer, 1))
        data_buffer_pointer += 256


    print("")
    print("Wrote " + str(bytes_in_flash) + " Bytes in " + str(round(time.time()-start_time, 2)) + " s.")
    print("Byterate: " + str(round((bytes_in_flash/(time.time()-start_time))/1024, 1)) + "KByte/s.")
       

       
def read_device_buffer_device(address_high, address_mid):
    global device_buffer
   
    crcs_match = False
    try_index = 0
    while (not crcs_match) and try_index < 4:
        message = [0x7, address_high, address_mid]
        ser.write(message)
        response = ser.read(259)
        #print(response)
        if response[0] == 0x07:
            # We got the bytes back
            device_buffer = response[1:257]
            # Calculate CRC for given byte block
            calculate_device_buffer_CRC()
            # check if CRCs match
            if bytes(divmod(CRC, 256)) == response[257:]:
                # CRCs match!
                crcs_match = True
        try_index += 1
       
    if not crcs_match:
        raise Exception("Cant read Sector from 0x%0.6X" % (address_high*65536 + address_mid*256))
       
   
def write_device_buffer_device(address_high, address_mid):
   
    #print("Writing Buffer to 0x%0.6X" % (address_high*65536 + address_mid*256))
    calculate_device_buffer_CRC()
    # Create response piece by piece
    ser.flush()
    # Write header
    message = [8, 0x55, address_high, address_mid]
    # Write data
    message += device_buffer
    message += divmod(CRC, 256)
    index = 0
    response = 2
    while response != 0:
        ser.write(message)
        ser.flush()
        response = ser.read(2)[1]
        index += 1
        if index == 3:
            raise Exception("Cant write Sector at " + str(address_high) + " " + str(address_mid) + "!")
       

def read_byte_device(address_high, address_mid, address_low):
   
    ser.flush()
    message = [0x05, address_high, address_mid, address_low]
    try_index = 0
    while try_index < 4:
        ser.write(message)
        response = ser.read(2)
        if response[0] == b'\x05':
            return response[1]
        try_index += 1
    raise Exception("Cant read single byte at 0x%0.6X" % (address_high*65536 + address_mid*256 + address_low))
   
def write_byte_device(address_high, address_mid, address_low, data):

    ser.flush()
    message = [0x06, address_high, address_mid, address_low, data]
    try_index = 0
    while try_index < 4:
        ser.write(message)
        response = ser.read(2)
        if response[1] == b'\x00':
            return
        try_index += 1
    raise Exception("Cant write single byte at 0x%0.6X" % (address_high*65536 + address_mid*256 + address_low))    


def erase_device():

    print("Erasing whole device...", end='')
    ser.flush()
    message = [0x03, 0x55]
    try_index = 0
    while try_index < 4:
        ser.write(message)
        ser.flush()
        time.sleep(0.1)
        response = ser.read(2)
        if response[1] == 0x00:
            print("done")
            return
        try_index += 1
    raise Exception("Cant erase whole device!")


def erase_sector(address_high, address_mid):

   
    print("Erasing sector 0x%0.6X ..." % (address_high*65536 + address_mid*256), end='')
    message = [0x04, 0x55, address_high, address_mid]
    ser.write(message)
    try_index = 0
    while try_index < 4:
        response = ser.read(2)
        if response[1] == b'\x00':
            print("done")
            return
        try_index += 1
    raise Exception("Cant erase sector starting from 0x%0.6X" % (address_high*65536 + address_mid*256))
   
   
def verify_data_buffer():
   
    print("\nVerifying...\n0%                                  100%\n")
    progress_len = bytes_in_flash/40

   
    start_time = time.time()
    data_buffer_pointer = 0
    while data_buffer_pointer < bytes_in_flash:
        # Get Bytes from device
        read_device_buffer_device(byte(data_buffer_pointer, 2), byte(data_buffer_pointer, 1))
       
        # Copy from data buffer to device buffer
        for x in range(0, 256):
            if data_buffer[(data_buffer_pointer + x)] != device_buffer[x]:
                raise Exception("Bytes at 0x%0.6X" % (data_buffer_pointer + x) + " dont match! File: 0x%0.2X" % data_buffer[(data_buffer_pointer + x)] + " Device: 0x%0.2X" % device_buffer[x])

            if (((data_buffer_pointer + x))%progress_len)//1 == 0:
                print("=", end='', flush=True)
        data_buffer_pointer += 256

    print("")
    print("Verified " + str(bytes_in_flash) + " Bytes in " + str(round(time.time()-start_time, 2)) + " s.")
    print("Byterate: " + str(round((bytes_in_flash/(time.time()-start_time))/1024, 1)) + "KByte/s.")
   

   
print("Flash Programmer V1.00")    
if len(sys.argv) == 1:
    # No arguments
    display_help()
   
else:
    open_port(sys.argv[1])
    get_chip_id()
    check_for_flash_size()

    command = sys.argv[2].split(":")
    # -f argument
    if command[0] == "-f":
        # Check if filetype is known
        if not ((command[2].split(".")[1] == "shex") or (command[2].split(".")[1] == "bin")):
            raise Exception("Filetype ." + command[2].split(".")[1] + " isnt supported!")
           
        # File Mode
        if command[1] == "w":
            upload_file(command[2], int(command[3], 16))
           
        elif command[1] == "r":
            download_file(command[2], int(command[3], 16))
           
        elif command[1] == "v":
            verify_file(command[2], int(command[3], 16))
           
    elif command[0] == "-m":
        # Manual Mode
        if command[1] == "i":
            print("Flash ID: " + hex(flash_id) + " with " + str(bytes_in_flash) + " Bytes")
           
        elif command[1] == "r":
            address = int(command[2], 16)
            print(hex(read_byte_device(byte(address, 2), byte(address, 1), byte(address, 0))))
           
        elif command[1] == "w":
            address = int(command[2], 16)
            write_byte_device(byte(address, 2), byte(address, 1), byte(address, 0), command[3])
           
        elif command[1] == "e":
            erase_device()

        elif command[1] == "rl":
            address = int(command[2], 16)
            read_device_buffer_device(byte(address, 2), byte(address, 1))
           
            for index in range(0, 256):
                print(" 0x%0.2X" % device_buffer[index], end = '')

        elif command[1] == "wl":
            address = int(command[2], 16)
            data_array = bytes(command[3].split(" "))
            for index in range(0, 256):
                device_buffer[index] = data_array[index]

            write_device_buffer_device(byte(address, 2), byte(address, 1))

        elif command[1] == "el":
            erase_device()

        else:
            print("Error: unknown operation " + command[1])
           
    ser.close()