/**
* Estimate the number of distinct elements in a data stream
* (F0 estimation problem) based on the algorithm published in the
* Proceedings of 30th Annual European Symposium on Algorithms (ESA 2022)
* https://doi.org/10.48550/arXiv.2301.10191
*/
import java.util.HashSet;
import java.util.Random;
import java.util.Set;
import java.util.function.Function;
import java.util.stream.IntStream;
import java.util.stream.Stream;
public class F0Estimator {
// Probability to add an element
private double p = 1.0;
/**
* Estimate number of unique elements in the passed stream
* @param storageSize The storage to use
*/
public <T> long uniqueElements(Stream<T> stream, int storageSize) {
final float LOAD_FACTOR = 0.5f;
Set<T> X = new HashSet<>(storageSize , LOAD_FACTOR);
Random random = new Random();
stream.forEach(element -> {
// Ensure element is in the set with probability p
if (random.nextDouble() < p)
X.add(element);
else
X.remove(element);
if (X.size() >= storageSize) {
// Randomly keep each element in X with probability 1/2
X.removeIf(e -> random.nextDouble() < 0.5);
p /= 2;
if (X.size() >= storageSize) {
throw new IllegalStateException("Threshold exceeded after sampling");
}
}
});
return (long) (X.size() / p);
}
public static void main(String[] args) {
// Create a Random instance
Random random = new Random();
// Create a stream of many random integers
Stream<Integer> stream = IntStream
.generate(random::nextInt).limit(1_000_000_000)
.map(i -> i & 0xffff)
.boxed();
final int STORAGE_SIZE = 1000;
var estimator = new F0Estimator();
long uniqueCount = estimator.uniqueElements(stream, STORAGE_SIZE);
System.out.println("Estimated number of unique elements: " + uniqueCount);
}
}