package cc.alcina.framework.servlet.cluster.transform;

import cc.alcina.framework.common.client.WrappedRuntimeException;
import cc.alcina.framework.common.client.util.Ax;
import cc.alcina.framework.entity.logic.EntityLayerUtils;
import cc.alcina.framework.entity.persistence.domain.DomainStore;
import cc.alcina.framework.entity.registry.ClassLoaderAwareRegistryProvider;
import cc.alcina.framework.entity.transform.DomainTransformRequestPersistent;
import cc.alcina.framework.entity.transform.event.DomainTransformPersistenceEvent;
import cc.alcina.framework.entity.transform.event.DomainTransformPersistenceQueue;
import cc.alcina.framework.entity.util.OffThreadLogger;
import cc.alcina.framework.servlet.cluster.transform.ClusterTransformRequest;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.slf4j.Logger;

/* loaded from: input_file:alcina-servlet.jar:cc/alcina/framework/servlet/cluster/transform/ClusterTransformListener.class */
public class ClusterTransformListener implements ExternalTransformPersistenceListener {
    private TransformCommitLog transformCommitLog;
    private DomainStore domainStore;
    private TransformCommitLogHost commitLogHost;
    Logger logger = OffThreadLogger.getLogger(getClass());
    private ConcurrentHashMap<Long, CountDownLatch> preCommitLatches = new ConcurrentHashMap<>();

    public ClusterTransformListener(TransformCommitLogHost transformCommitLogHost, TransformCommitLog transformCommitLog, DomainStore domainStore) {
        this.commitLogHost = transformCommitLogHost;
        this.transformCommitLog = transformCommitLog;
        this.domainStore = domainStore;
    }

    @Override // cc.alcina.framework.entity.transform.event.DomainTransformPersistenceListener
    public boolean isPreBarrierListener() {
        return true;
    }

    @Override // cc.alcina.framework.entity.transform.event.DomainTransformPersistenceListener
    public void onDomainTransformRequestPersistence(DomainTransformPersistenceEvent domainTransformPersistenceEvent) {
        if (domainTransformPersistenceEvent.isFiringFromQueue()) {
            return;
        }
        domainTransformPersistenceEvent.getTransformPersistenceToken();
        List<DomainTransformRequestPersistent> persistedRequests = domainTransformPersistenceEvent.getPersistedRequests();
        switch (domainTransformPersistenceEvent.getPersistenceEventType()) {
            case PREPARE_COMMIT:
            default:
                return;
            case COMMIT_OK:
                publishRequests(persistedRequests, ClusterTransformRequest.State.COMMIT);
                return;
            case PRE_COMMIT:
                if (persistedRequests.isEmpty()) {
                    return;
                }
                ClassLoader contextClassLoader = Thread.currentThread().getContextClassLoader();
                try {
                    Thread.currentThread().setContextClassLoader(ClassLoaderAwareRegistryProvider.get().getServletLayerClassloader());
                    CountDownLatch countDownLatch = new CountDownLatch(1);
                    this.preCommitLatches.put(Long.valueOf(domainTransformPersistenceEvent.getMaxPersistedRequestId()), countDownLatch);
                    publishRequests(persistedRequests, ClusterTransformRequest.State.PRE_COMMIT);
                    try {
                        long currentTimeMillis = System.currentTimeMillis();
                        if (countDownLatch.await(300000L, TimeUnit.MILLISECONDS)) {
                            this.logger.info("Pre-commit await: request {} : {} ms", Long.valueOf(domainTransformPersistenceEvent.getMaxPersistedRequestId()), Long.valueOf(System.currentTimeMillis() - currentTimeMillis));
                        } else {
                            this.logger.info("Pre-commit timeout: request {} : {} ms", Long.valueOf(domainTransformPersistenceEvent.getMaxPersistedRequestId()), Long.valueOf(System.currentTimeMillis() - currentTimeMillis));
                        }
                        Thread.currentThread().setContextClassLoader(contextClassLoader);
                        return;
                    } catch (InterruptedException e) {
                        throw new WrappedRuntimeException(e);
                    }
                } catch (Throwable th) {
                    Thread.currentThread().setContextClassLoader(contextClassLoader);
                    throw th;
                }
            case COMMIT_ERROR:
                publishRequests(persistedRequests, ClusterTransformRequest.State.ABORTED);
                return;
        }
    }

    public void startService() {
        try {
            this.transformCommitLog.consumer(this.commitLogHost, this::handleClusterTransformRequest, EntityLayerUtils.getLocalHostName(), System.currentTimeMillis());
            this.domainStore.getPersistenceEvents().addDomainTransformPersistenceListener(this);
        } catch (Exception e) {
            throw new WrappedRuntimeException(e);
        }
    }

    public void stopService() {
        if (this.domainStore == DomainStore.writableStore()) {
            this.domainStore.getPersistenceEvents().removeDomainTransformPersistenceListener(this);
        }
    }

    protected void publishRequests(List<DomainTransformRequestPersistent> list, ClusterTransformRequest.State state) {
        list.forEach(domainTransformRequestPersistent -> {
            List<Future<RecordMetadata>> sendTransformPublishedMessages = this.transformCommitLog.sendTransformPublishedMessages(domainTransformRequestPersistent, state);
            if (state != ClusterTransformRequest.State.PRE_COMMIT) {
                this.logger.info("Published transform message: {} {}", Long.valueOf(domainTransformRequestPersistent.getId()), state);
                return;
            }
            try {
                this.logger.info("Published transform message: {} {}{}:: {} transforms", Long.valueOf(domainTransformRequestPersistent.getId()), state, sendTransformPublishedMessages.size() == 1 ? "" : Ax.format("(%s packets) ", Integer.valueOf(sendTransformPublishedMessages.size())), Integer.valueOf(domainTransformRequestPersistent.getEvents().size()));
            } catch (Exception e) {
                this.logger.warn("Persist record issue: request {}", Long.valueOf(domainTransformRequestPersistent.getId()));
                e.printStackTrace();
            }
        });
    }

    void handleClusterTransformRequest(ClusterTransformRequest clusterTransformRequest) {
        this.logger.info("Received transform message: {} {}", Long.valueOf(clusterTransformRequest.id), clusterTransformRequest.state);
        DomainTransformPersistenceQueue queue = this.domainStore.getPersistenceEvents().getQueue();
        switch (clusterTransformRequest.state) {
            case PRE_COMMIT:
                queue.onRequestDataReceived(clusterTransformRequest.request, true);
                this.logger.info("Post request data received: {} {}", Long.valueOf(clusterTransformRequest.id), clusterTransformRequest.state);
                CountDownLatch remove = this.preCommitLatches.remove(Long.valueOf(clusterTransformRequest.request.getId()));
                if (remove != null) {
                    remove.countDown();
                    this.logger.info("Released latch: {} {}", Long.valueOf(clusterTransformRequest.id), clusterTransformRequest.state);
                    return;
                }
                return;
            case COMMIT:
                try {
                    queue.onTransformRequestCommitted(clusterTransformRequest.id, false);
                    return;
                } catch (Throwable th) {
                    this.logger.warn("DEVEX::0 - Exception in handleClusterTransformRequest::commit - {}", th);
                    th.printStackTrace();
                    return;
                }
            case ABORTED:
                queue.onTransformRequestAborted(clusterTransformRequest.id);
                return;
            default:
                return;
        }
    }
}
