项目作者: richardstartin

项目描述 :
Parallel boolean circuit evaluation
高级语言: Java
项目地址: git://github.com/richardstartin/splitmap.git
创建时间: 2018-02-04T17:11:03Z
项目社区:https://github.com/richardstartin/splitmap

开源协议:Apache License 2.0

下载


splitmap

Build Status
Coverage Status

This library builds on top of RoaringBitmap to provide a parallel implementation of boolean circuits (multidimensional filters) and arbitrary aggregations over filters.

For instance, to compute a sum product on a dataset filtered such that only one of two conditions holds:

  1. PrefixIndex<ChunkedDoubleArray> quantities = ...
  2. PrefixIndex<ChunkedDoubleArray> prices = ...
  3. SplitMap februarySalesIndex = ...
  4. SplitMap luxuryProductsIndex = ...
  5. QueryContext<String, PriceQty> context = new QueryContext<>(
  6. Map.ofEntries(entry("luxuryProducts", luxuryProductsIndex), entry("febSales", februarySalesIndex),
  7. Map.ofEntries(entry(PRICE, prices), entry(QTY, quantities))));
  8. double februaryRevenueFromLuxuryProducts =
  9. Circuits.evaluateIfKeysIntersect(context, slice -> slice.get("febSales").and(slice.get("luxuryProducts")), "febSales", "luxuryProducts")
  10. .stream()
  11. .parallel()
  12. .mapToDouble(partition -> partition.reduceDouble(SumProduct.<PriceQty>reducer(price, quantities)))
  13. .sum();

Which, over millions of quantities and prices, can be computed in under 200 microseconds on a modern processor, where parallel streams may take upwards of 20ms.

It is easy to write arbitrary routines combining filtering, calculation and aggregation. For example statistical calculations evaluated with filter criteria.

  1. public double productMomentCorrelationCoefficient() {
  2. // calculate the correlation coefficient between prices observed on different exchanges
  3. PrefixIndex<ChunkedDoubleArray> exchange1Prices = ...
  4. PrefixIndex<ChunkedDoubleArray> exchange2Prices = ...
  5. SplitMap beforeClose = ...
  6. SplitMap afterOpen = ...
  7. QueryContext<Exchange, PriceQty> context = new QueryContext<>(
  8. Map.ofEntries(entry(BEFORE_CLOSE, beforeClose), entry(AFTER_OPEN, afterOpen),
  9. Map.ofEntries(entry(NASDAQ, exchange1Prices), entry(LSE, exchange2Prices))));
  10. // evaluate product moment correlation coefficient
  11. return Circuits.evaluate(context, slice -> slice.get(BEFORE_CLOSE).or(slice.get(AFTER_OPEN)),
  12. BEFORE_CLOSE, AFTER_OPEN)
  13. .stream()
  14. .parallel()
  15. .map(partition -> partition.reduce(SimpleLinearRegression.<Exchanges>reducer(exchange1Prices, exchange2Prices)))
  16. .collect(SimpleLinearRegression.pmcc());
  17. }