Resampling classes across weighted source distributions

mixture-distributionpythonresamplingsamplingunbalanced-classes

I am sure this is a common problem, but googling only yielded false positives. I probably did not know what terms to search for. So here we go:

I have $n$ classes from $m$ different sources. Each source provides $k \leq n$ of the different classes, and the probabilities of the $k$ classes vary between the sources. The examples of the classes also differ somewhat in their quality and overall nature across sources (in this case, images of certain entities). But one source alone does not provide sufficient numbers of examples to train and confidently evaluate a classifier. Hence, I need to combine the sources. But the sources are of varying representativeness for my target distribution $T$. $T$ is one of my sources as well. So I need to build some weighted combination of the sources whilst also accounting for their different probability distributions over the classes.

Are there any standard solutions to a problem like this? I assume so. I came up with the following:

  1. Assign weights $W_s \in \mathbf{R}^m$ to sources
  2. Define target class weights $W_c \in \mathbf{R}^n$ across sources
  3. resample classes within sources to meet $W_c$
  4. resample sources to meet $W_s$

The result is a weighted mixture distribution of the sources according to $W_s$ and $W_c$.

Can you help me with the following questions?

  • Is my solution valid? Are there any statistical pitfalls that I overlooked?
  • Are there better solutions?
  • Are there (Python) packages that solve this more succinctly?
  • What terms do I need to research to find more on this kind of problem?

Thanks!

Here is an example:

enter image description here

For simplicity, I did not use the softening of x ** .85 in the illustration.

import numpy as np
import pandas as pd

n = 10000
source_proportions = (props := [1, .5, .3])/np.sum(props)
examples_from_A = np.random.choice(["A", "B", "C"], int(n * source_proportions[0]), p=(props := [.5, .1, .4])/np.sum(props))
examples_from_B = np.random.choice(["A", "B", "C"], int(n * source_proportions[1]), p=(props := [.4, .4, .2])/np.sum(props))
examples_from_C = np.random.choice(["A", "B", "C"], int(n * source_proportions[2]), p=(props := [1, 1, 1])/np.sum(props))

df = pd.DataFrame(
    data={
        "label": np.concatenate([examples_from_A, examples_from_B, examples_from_C]),
        "source": ["X"]*len(examples_from_A)+["Y"]*len(examples_from_B)+["Z"]*len(examples_from_C)
    }
)


# resample classes within sources
df = pd.concat([transform_class_frequencies(df[df.source == s], {"A": .6, "B": .3, "C": .1}, class_column="label") for s in df.source.unique()])
# resample sources
df = transform_class_frequencies(df, {"X": .3, "Y": .3, "Z": .4}, class_column="source")

The resampling function I defined like this:

from functools import partial
from itertools import compress, starmap
from operator import itemgetter

import numpy as np
import pandas as pd
from sklearn.utils import resample


def downsample(klass, target_size, df, class_column="label"):
    class_df = df[df[class_column] == klass]
    n_examples = len(class_df)

    assert target_size <= n_examples

    surplus = (n_examples - target_size)
    softened_surplus = np.ceil(surplus ** .85).astype(int)
    target = n_examples - softened_surplus

    downsampled_df = resample(class_df, replace=False, n_samples=target, random_state=42)
    return downsampled_df


def upsample(klass, target_size, df, class_column="label"):
    class_df = df[df[class_column] == klass]
    n_examples = len(class_df)

    assert target_size >= n_examples

    missing = (target_size - n_examples)
    softened_missing = np.ceil(missing ** .85).astype(int)

    upsampled_df = resample(class_df, replace=True, n_samples=softened_missing, random_state=42)
    return pd.concat([class_df, upsampled_df])


def transform_class_frequencies(df, class_2_target_class_proportion, class_column="label"):
    """Resamples classes from dataframe to reach a specified class frequency distributions.

    Args:
        df (pandas.DataFrame): Dataframe to resample classes from. Must have a column "label"
        class_2_target_class_proportion (dict): mapping from class names to target proportions
    """
    def normalize(a):
        return a / np.sum(a)

    df_classes, defacto_proportions = zip(*sorted(df[class_column].value_counts().to_dict().items(), key=itemgetter(0)))
    class_2_target_class_proportion = {c: p for c, p in class_2_target_class_proportion.items() if c in df_classes}
    classes, target_class_proportions = zip(*sorted(class_2_target_class_proportion.items(), key=itemgetter(0)))

    target_class_proportions, defacto_proportions = map(normalize, (target_class_proportions, defacto_proportions))

    downsampling_mask = np.array([*(defacto_proportions > target_class_proportions)])
    upsampling_mask = ~downsampling_mask

    target_sizes = np.ceil(len(df) * target_class_proportions).astype(int)

    sampling = lambda sampling_function, sampling_mask: starmap(
        partial(sampling_function, df=df, class_column=class_column),
        compress(zip(classes, target_sizes), sampling_mask)
    )

    df_per_down_class = sampling(downsample, downsampling_mask)
    df_per_up_class = sampling(upsample, upsampling_mask)

    downsampled = pd.concat(df_per_down_class)
    upsampled = pd.concat(df_per_up_class)

    return pd.concat([downsampled, upsampled])

Update

I identified an issue with my solution: If classes are missing from a source, the final mixture will be skewed and require another round of resampling of the classes. That introduced potential upsampling of previously downsampled classes. Since I am not dealing with generative distributions, for me that means either a loss of variance in my final distribution or the need to implement an ugly lookup mechanism, that only clones data points when upsampling, if there are no "reserve" data point from previous downsamplings.

Hence, I came up with a solution that circumvents this. This solution computes the outer product (semantically, not actually in the implementation) between the $W_s$ and $W_c$, then partitions the data by classes $\times$ sources and then upsamples or downsamples each partition only once.

But this is still missing a crucial component, which is the implementation of the function whatthehelldoIknow. The remaining issue is, that the final distribution does not match $W_c$ anymore.

whatthehelldoIknow computes

$$\underset{P_s'}{\text{argmin}} \quad D_{KL}(P_s ~||~ P_s') \quad\text{s.t.} \quad \operatorname{mix}(\text{data}, P_c, P_s') = P_c$$

where $P_c$ and $P_s$ are the desired distribution over classes and sources. In other words they are the normalized $W_c$ and $W_s$. Hence, given a $P_c$ the function finds $P_s'$ closest to $P_s$ that satisfies the constraint that the mixture looks like the desired distribution over classes. Does that make sense?

def __upsample(class_df, target_size, smoothing_factor=.85):
    n_examples = len(class_df)
    assert target_size >= n_examples
    missing = (target_size - n_examples)
    softened_missing = np.ceil(missing ** smoothing_factor).astype(int)
    upsampled_df = resample(class_df, replace=True, n_samples=softened_missing, random_state=42)
    return upsampled_df


def __downsample(df, n_samples, smoothing_factor=.85):
    n_examples = len(df)
    assert n_samples <= n_examples
    surplus = (n_examples - n_samples)
    softened_surplus = np.ceil(surplus ** smoothing_factor).astype(int)
    target = n_examples - softened_surplus
    downsampled_df = resample(df, replace=False, n_samples=target, random_state=42)
    return downsampled_df


def normalize(a):
    a = np.array(a)
    return a / np.sum(a)


def normalize_proportions(df_classes: List[str], class_2_proportion):
    assert sorted({*class_2_proportion.keys()}) == df_classes
    class_2_proportion = dict(zip(class_2_proportion.keys(), normalize(list(class_2_proportion.values()))))
    print(class_2_proportion)
    return np.array([class_2_proportion.get(c, 0) for c in df_classes])


def get_unique_sorted_values(column):
    return sorted(column.unique())


def resample_sub_df(df, n_samples, smoothing_factor=.85):
    difference = n_samples - len(df)
    down = lambda: __downsample(df, n_samples, smoothing_factor=smoothing_factor)
    up = lambda: pd.concat([df, __upsample(df, n_samples, smoothing_factor=smoothing_factor)])
    return down() if difference < 0 else up()


def mix(df, class_proportions, source_proportions, class_column="label", source_column="source", smoothing_factor=.85):

    classes, sources = map(get_unique_sorted_values, [df[class_column], df[source_column]])

    class_probs = normalize_proportions(classes, class_proportions)
    source_probs = normalize_proportions(sources, source_proportions)


    n_samples_per_sub_df = np.ceil(np.multiply(*zip(*product(class_probs, source_probs))) * len(df)).astype(int)

    sub_df_keys, n_samples_per_sub_df = map(
        list, zip(*sorted(zip(product(classes, sources), n_samples_per_sub_df), key=itemgetter(0)))
    )

    sub_df_keys_actual, sub_dfs = zip(*df.groupby([class_column, source_column]))

    mask = [k in sub_df_keys_actual for k in sub_df_keys]
    n_samples_per_sub_df = compress(n_samples_per_sub_df, mask)

    aggr_df = pd.concat(
        starmap(partial(resample_sub_df, smoothing_factor=smoothing_factor), zip(sub_dfs, n_samples_per_sub_df))
    ).reset_index(drop=True)

    return aggr_df


def whatthehelldoIknow():
    # given a target class distribution P_c and a target source distrinution P_s, find
    # argmin P_s'  D_KL(P_s || P_s') such that mix(data, P_c, P_s') = P_c.
    pass

Update 2

Ok, instead of solving that equation, I took a dirty solution of distributing the mass that was allocated to empty partitions over the non-empty partitions:

def mix(df, class_proportions, source_proportions, class_column="label", source_column="source", smoothing_factor=.85):

    classes, sources = map(get_unique_sorted_values, [df[class_column], df[source_column]])

    class_probs = normalize_proportions(classes, class_proportions)
    source_probs = normalize_proportions(sources, source_proportions)

    proportion_per_sub_df = np.outer(class_probs, source_probs)

    shape = proportion_per_sub_df.shape
    proportion_per_sub_df = proportion_per_sub_df.ravel()

    assert np.allclose(proportion_per_sub_df, np.multiply(*zip(*product(class_probs, source_probs))))

    sub_df_keys, proportion_per_sub_df = map(
        list, zip(*sorted(zip(product(classes, sources), proportion_per_sub_df), key=itemgetter(0)))
    )
    proportion_per_sub_df = np.array(proportion_per_sub_df)

    sub_df_keys_actual, sub_dfs = zip(*df.groupby([class_column, source_column]))

    mask = np.array([k in sub_df_keys_actual for k in sub_df_keys])

    proportion_per_sub_df[np.where(~mask)] = 0
    proportion_per_sub_df = proportion_per_sub_df.reshape(shape)
    missing_mass = class_probs - proportion_per_sub_df.sum(axis=1)
    missing_mass_mean_per_class = missing_mass / np.sum(proportion_per_sub_df != 0, axis=1)
    proportion_per_sub_df += (proportion_per_sub_df != 0) * missing_mass_mean_per_class.reshape(-1, 1)

    proportion_per_sub_df = proportion_per_sub_df.ravel()
    n_samples_per_sub_df = np.ceil(proportion_per_sub_df * len(df)).astype(int)
    n_samples_per_sub_df = compress(n_samples_per_sub_df, mask)

    aggr_df = pd.concat(
        starmap(partial(resample_sub_df, smoothing_factor=smoothing_factor), zip(sub_dfs, n_samples_per_sub_df))
    ).reset_index(drop=True)
    
    return aggr_df

This "works". It produces the desired distribution of classes in the mixture distribution, but it probably is not the optimal solution accordings to the minimization problem form above.

Update 3

For what it's worth, here is a refactored and tested version, in case anybody wants to use or adapt it, who is facing this problem:

from functools import partial
from itertools import starmap, product
from operator import itemgetter, truth
from typing import List

import numpy as np
import pandas as pd
from sklearn.utils import resample


def __upsample(class_df, target_size, smoothing_factor=0.85):
    n_examples = len(class_df)
    assert target_size >= n_examples
    missing = target_size - n_examples
    softened_missing = np.ceil(missing ** smoothing_factor).astype(int)
    upsampled_df = resample(class_df, replace=True, n_samples=softened_missing, random_state=42)
    return upsampled_df


def __downsample(df, n_samples, smoothing_factor=0.85):
    n_examples = len(df)
    assert n_samples <= n_examples
    surplus = n_examples - n_samples
    softened_surplus = np.ceil(surplus ** smoothing_factor).astype(int)
    target = n_examples - softened_surplus
    downsampled_df = resample(df, replace=False, n_samples=target, random_state=42)
    return downsampled_df


def normalize(a):
    a = np.array(a)
    return a / np.sum(a)


def normalize_proportions(classes: List[str], class_2_proportion):
    assert sorted({*class_2_proportion.keys()}) == classes
    class_2_proportion = dict(zip(class_2_proportion.keys(), normalize(list(class_2_proportion.values()))))
    return np.array([class_2_proportion.get(c, 0) for c in classes])


def get_unique_sorted_values(column):
    return sorted(column.unique())


def resample_partition(df, n_samples, smoothing_factor=0.85):
    difference = n_samples - len(df)
    down = lambda: __downsample(df, n_samples, smoothing_factor=smoothing_factor)
    up = lambda: pd.concat([df, __upsample(df, n_samples, smoothing_factor=smoothing_factor)])
    return down() if difference < 0 else up()


def __raveled_matrix_has_same_order_as_cross_product_of_vectors(matrix, v, w):
    return np.allclose(matrix, np.multiply(*zip(*product(v, w))))


def mix(df, class_proportions, source_proportions, class_column="label", source_column="source", smoothing_factor=0.85):
    def classes(df):
        return get_unique_sorted_values(df[class_column])

    def sources(df):
        return get_unique_sorted_values(df[source_column])

    def rel_class_frqs(df):
        return normalize_proportions(classes(df), class_proportions)

    def rel_source_frqs(df):
        return normalize_proportions(sources(df), source_proportions)

    def compute_sampling_matrix(df):
        proportion_per_partition = np.outer(rel_class_frqs(df), rel_source_frqs(df)).ravel()
        assert __raveled_matrix_has_same_order_as_cross_product_of_vectors(
            proportion_per_partition, rel_class_frqs(df), rel_source_frqs(df)
        )
        return proportion_per_partition

    def partition_proportions_by_class_and_source(df):
        proportion_per_partition = compute_sampling_matrix(df)
        partition_keys, proportion_per_partition = map(
            list, zip(*sorted(zip(product(classes(df), sources(df)), proportion_per_partition), key=itemgetter(0)))
        )
        proportion_per_partition = np.array(proportion_per_partition)
        return partition_keys, proportion_per_partition

    def partition_df_by_class_and_source(df):
        return zip(*df.groupby([class_column, source_column]))

    def mask_empty_partitions(non_empty_partitions_keys, partition_keys):
        return np.array([k in non_empty_partitions_keys for k in partition_keys])

    def zero_out_proportions_of_empty_partitions(proportion_per_partition, mask):
        proportion_per_partition_nulled = np.where(mask, proportion_per_partition, 0).reshape((len(classes(df)), -1))
        return proportion_per_partition_nulled

    def redistribute_empty_partition_mass(proportion_per_partition, mask):
        proportion_per_partition_zeroed_out = zero_out_proportions_of_empty_partitions(proportion_per_partition, mask)
        missing_mass = rel_class_frqs(df) - proportion_per_partition_zeroed_out.sum(axis=1)
        mean_missing_mass_per_class = missing_mass / np.sum(proportion_per_partition_zeroed_out != 0, axis=1)
        redistributed_mass = (proportion_per_partition_zeroed_out != 0) * mean_missing_mass_per_class.reshape(-1, 1)
        proportion_per_partition_redistributed = np.ravel(proportion_per_partition_zeroed_out + redistributed_mass)
        return proportion_per_partition_redistributed

    def compute_required_samples_per_partition(proportion_per_partition_redistributed):
        n_samples_per_partition = np.ceil(proportion_per_partition_redistributed * len(df)).astype(int)
        n_samples_per_partition = filter(truth, n_samples_per_partition)
        return n_samples_per_partition

    def resample_and_aggregate_partitions(partitions, n_samples_per_partition):
        aggr_df = pd.concat(
            starmap(
                partial(resample_partition, smoothing_factor=smoothing_factor), zip(partitions, n_samples_per_partition)
            )
        ).reset_index(drop=True)
        return aggr_df

    partition_keys, proportion_per_partition = partition_proportions_by_class_and_source(df)
    non_empty_partitions_keys, partitions = partition_df_by_class_and_source(df)
    mask = mask_empty_partitions(non_empty_partitions_keys, partition_keys)
    proportion_per_partition_redistributed = redistribute_empty_partition_mass(proportion_per_partition, mask)
    n_samples_per_partition = compute_required_samples_per_partition(proportion_per_partition_redistributed)
    aggr_df = resample_and_aggregate_partitions(partitions, n_samples_per_partition)

    return aggr_df

Best Answer

This is a problem known as "domain adaptation", where your training data are distributed differently than the population you intend to apply fitted models to.

In Python, there is a handy package called Adapt, and I think (more robust) solutions conceptually similar to the original post's approach can be found in the "instance based" weighting options.

Related Question