/*
 * Decompiled with CFR 0.152.
 */
package org.infinispan.reactive.publisher.impl;

import io.reactivex.rxjava3.core.Flowable;
import java.io.IOException;
import java.io.ObjectInput;
import java.io.ObjectOutput;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletionStage;
import java.util.function.Function;
import org.infinispan.Cache;
import org.infinispan.commands.GetAllCommandStressTest;
import org.infinispan.commons.marshall.Externalizer;
import org.infinispan.commons.marshall.SerializeWith;
import org.infinispan.configuration.cache.CacheMode;
import org.infinispan.factories.ComponentRegistry;
import org.infinispan.reactive.publisher.impl.ClusterPublisherManager;
import org.infinispan.reactive.publisher.impl.DeliveryGuarantee;
import org.infinispan.test.fwk.InCacheMode;
import org.reactivestreams.Publisher;
import org.testng.AssertJUnit;
import org.testng.annotations.Test;

@Test(groups={"stress"}, testName="PublisherManagerGetKeyStressTest", timeOut=900000L)
@InCacheMode(value={CacheMode.DIST_SYNC})
public class PublisherManagerGetKeyStressTest
extends GetAllCommandStressTest {
    private static final Function<Publisher<Map<Integer, Integer>>, CompletionStage<Map<Integer, Integer>>> FINALZER = p -> Flowable.fromPublisher((Publisher)p).reduce((map1, map2) -> {
        map1.putAll(map2);
        return map1;
    }).toCompletionStage();

    @Override
    protected void workerLogic(Cache<Integer, Integer> cache, Set<Integer> threadKeys, int iteration) {
        ClusterPublisherManager cpm = (ClusterPublisherManager)ComponentRegistry.componentOf(cache, ClusterPublisherManager.class);
        CompletionStage stage = cpm.entryReduction(false, null, threadKeys, null, 0L, DeliveryGuarantee.EXACTLY_ONCE, MapReducer.getInstance(), FINALZER);
        Map results = (Map)stage.toCompletableFuture().join();
        AssertJUnit.assertEquals((int)threadKeys.size(), (int)results.size());
        for (Integer key : threadKeys) {
            AssertJUnit.assertEquals((Object)key, results.get(key));
        }
    }

    @SerializeWith(value=MapReducerExternalizer.class)
    private static class MapReducer<K, V>
    implements Function<Publisher<Map.Entry<K, V>>, CompletionStage<Map<K, V>>> {
        private static final MapReducer INSTANCE = new MapReducer();

        private MapReducer() {
        }

        public static <K, V> Function<Publisher<? extends Map.Entry<K, V>>, CompletionStage<Map<K, V>>> getInstance() {
            return INSTANCE;
        }

        @Override
        public CompletionStage<Map<K, V>> apply(Publisher<Map.Entry<K, V>> entryPublisher) {
            HashMap startingMap = new HashMap();
            return Flowable.fromPublisher(entryPublisher).collectInto(startingMap, (map, e) -> map.put(e.getKey(), e.getValue())).toCompletionStage();
        }

        static final class MapReducerExternalizer
        implements Externalizer<MapReducer> {
            MapReducerExternalizer() {
            }

            public void writeObject(ObjectOutput output, MapReducer object) throws IOException {
            }

            public MapReducer readObject(ObjectInput input) throws IOException, ClassNotFoundException {
                return INSTANCE;
            }
        }
    }
}

