Scale Unsigned Integers

Ulf Hamster 3 min.
python scaling

In spike04 a function scale_integer was introduced and it seems to suffer from overflow issues. Of course I forgot about overflow problems with integers.

Old Code

import numpy as np

def scale_integer(x, goal):
    """Old Code
    Example:
    --------
        x = np.array([9, 4, 5, 1, 32])
        goal = 100
        y = scale_integer(x, goal)
    """
    #y = np.round(goal * x / x.sum(), 0).astype(int)  # BUG#1!
    y = np.round(goal * (x / x.sum()), 0).astype(int)  # FIX#1
    #d = goal - y.sum()  # BUG#2: would cast to type(d)=uint
    d = goal - y.sum(dtype=int)  # FIX#2
    if d != 0:
        idx = np.random.permutation(range(len(x)))[:np.abs(d)]
        y[idx] += np.sign(d)  # BUG#3: can underflow from 0 to 255
    return y

New Code

import numpy as np

def scale_uint(x, goal, random_state=None):
    """New Code
    Example:
    --------
        x = np.array([9, 4, 5, 1, 32])
        goal = 100
        y = scale_uint(x, goal)
    """
    # set random seed
    if random_state:
        np.random.seed(random_state)
    
    # read dtype
    dtype = x.dtype
    if dtype not in [np.uint8, np.uint16, np.uint32, np.uint64]:
        raise Exception(f"x is not an unisgned integer: {dtype}")
    
    # cast goal to uint
    goal = np.array([goal], dtype=dtype)[0]
    
    # Scale
    # (these are floating point operations!)
    xflt = np.array(x, dtype=np.float64)
    y = np.round(np.float64(goal) * (xflt / xflt.sum()), 0).astype(dtype)
    
    # "total" should match "goal"
    total = y.sum(dtype=dtype)
    
    # correct difference between goal and total
    # by randomly adding/subtracting by 1
    if goal > total:
        idx1 = np.where(y < np.iinfo(dtype).max)[0]  # find integers < dtype.max
        num = np.minimum(goal - total, idx1.size)  # number of integers to increase by +1
        idx2 = np.random.choice(idx1, size=(num,), replace=False)
        y[idx2] += 1
    elif goal < total:
        idx1 = np.where(y > 0)[0]  # find index of elements x>0
        num = np.minimum(total - goal, idx1.size)  # number of integers to reduce by -1
        idx2 = np.random.choice(idx1, size=(num,), replace=False)
        y[idx2] -= 1
    
    # done
    return y

Comparison 1

goal = 100
x = np.array([9, 4, 5, 1, 31], dtype=np.uint8)
print("sum(x)=", x.sum())
print("scale to", goal)

np.random.seed(42)
y = scale_integer(x, goal)
print("Old Code: y =", y)

y = scale_uint(x, goal, random_state=42)
print("New Code: y =", y)
sum(x)= 50
scale to 100
Old Code: y = [18  8 10  2 62]
New Code: y = [18  8 10  2 62]

Comparison 2

goal = 100
x = np.array([1, 1, 255], dtype=np.uint8)
print("sum(x)=", x.sum())
print("scale to", goal)

np.random.seed(42)
y = scale_integer(x, goal)
print("Old Code: y =", y)

y = scale_uint(x, goal, random_state=42)
print("New Code: y =", y)
sum(x)= 257
scale to 100
Old Code: y = [ 1  0 99]
New Code: y = [ 1  0 99]

Comparison 3

np.random.seed(42)
X = np.random.randint(1, high=100, size=(25000, 100)).astype(np.uint8)
goal = 100
print("Old Code")
np.random.seed(42)
%time Z1 = np.vstack([scale_integer(X[i,:], goal) for i in range(5000)])

print("sum of row (min):", Z1.sum(axis=1).max())
print("sum of row (max):", Z1.sum(axis=1).max())

idx = np.where(Z1.sum(axis=1) != goal)[0]
len(idx)
Old Code
CPU times: user 789 ms, sys: 16.1 ms, total: 805 ms
Wall time: 831 ms
sum of row (min): 100
sum of row (max): 100





0
print("New Code")
%time Z2 = np.vstack([scale_uint(X[i,:], goal, random_state=42) for i in range(5000)])

print("sum of row (min):", Z2.sum(axis=1).max())
print("sum of row (max):", Z2.sum(axis=1).max())

idx = np.where(Z2.sum(axis=1) != goal)[0]
len(idx)
New Code
CPU times: user 821 ms, sys: 31.3 ms, total: 852 ms
Wall time: 857 ms
sum of row (min): 100
sum of row (max): 100





0