package cc.alcina.framework.servlet.cluster.transform;

import cc.alcina.framework.common.client.WrappedRuntimeException;
import cc.alcina.framework.common.client.logic.reflection.registry.Registry;
import cc.alcina.framework.common.client.util.Ax;
import cc.alcina.framework.common.client.util.CommonUtils;
import cc.alcina.framework.common.client.util.ThrowingRunnable;
import cc.alcina.framework.entity.logic.EntityLayerLogging;
import cc.alcina.framework.entity.logic.EntityLayerObjects;
import cc.alcina.framework.entity.logic.EntityLayerUtils;
import cc.alcina.framework.entity.transform.DomainTransformRequestPersistent;
import cc.alcina.framework.entity.util.MethodContext;
import cc.alcina.framework.servlet.cluster.transform.ClusterTransformRequest;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Date;
import java.util.Iterator;
import java.util.List;
import java.util.Properties;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.WakeupException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:alcina-servlet.jar:cc/alcina/framework/servlet/cluster/transform/TransformCommitLog.class */
public class TransformCommitLog {
    private Producer<Void, byte[]> producer;
    private Timer timeoutChecker;
    private String hostName;
    private Consumer<ClusterTransformRequest> payloadConsumer;
    private TransformCommitLogThread currentConsumerThread;
    private long pollTimeout;
    private TopicPartition topicPartition;
    private long hostUid;
    private TransformCommitLogHost commitLogHost;
    private AtomicBoolean initialised = new AtomicBoolean();
    private AtomicBoolean stopped = new AtomicBoolean(false);
    private AtomicInteger consumerThreadCounter = new AtomicInteger(0);
    Logger logger = LoggerFactory.getLogger(getClass());
    private long highestSeekOffset = -1;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:alcina-servlet.jar:cc/alcina/framework/servlet/cluster/transform/TransformCommitLog$TransformCommitLogThread.class */
    public final class TransformCommitLogThread extends Thread {
        private final ClassLoader cl;
        private final String tName;
        long operationStartTime;
        long operationEndTime;
        private long pollTimeout;
        private AtomicBoolean cancelled = new AtomicBoolean(false);
        private long currentOffset = -1;
        private long previousConsumerCompletedOffset;
        private KafkaConsumer<Void, byte[]> consumer;
        CountDownLatch checkCurrentPositionLatch;

        private TransformCommitLogThread(ClassLoader classLoader, String str, long j) {
            this.cl = classLoader;
            this.tName = str;
            this.previousConsumerCompletedOffset = j;
        }

        public void checkCurrentPosition() {
            this.checkCurrentPositionLatch = new CountDownLatch(1);
            try {
                if (this.consumer != null && this.currentOffset != -1) {
                    this.consumer.wakeup();
                }
                this.checkCurrentPositionLatch.await();
            } catch (Exception e) {
                throw new WrappedRuntimeException(e);
            }
        }

        public boolean connected() {
            return this.currentOffset != -1;
        }

        private <V> V performOperation(Callable<V> callable) {
            try {
                try {
                    this.operationStartTime = System.currentTimeMillis();
                    this.operationEndTime = 0L;
                    V call = callable.call();
                    this.operationEndTime = System.currentTimeMillis();
                    this.operationStartTime = 0L;
                    return call;
                } catch (Exception e) {
                    throw new WrappedRuntimeException(e);
                }
            } catch (Throwable th) {
                this.operationStartTime = 0L;
                throw th;
            }
        }

        private void performOperation(ThrowingRunnable throwingRunnable) {
            performOperation(() -> {
                throwingRunnable.run();
                return null;
            });
        }

        /* JADX WARN: Multi-variable type inference failed */
        @Override // java.lang.Thread, java.lang.Runnable
        public void run() {
            ConsumerRecords consumerRecords;
            Thread.currentThread().setContextClassLoader(this.cl);
            Thread.currentThread().setName(this.tName);
            this.pollTimeout = TransformCommitLog.this.pollTimeout / 5;
            while (!TransformCommitLog.this.stopped.get() && !this.cancelled.get()) {
                try {
                    if (this.consumer == null) {
                        String format = Ax.format("%s-%s-%s-%s", TransformCommitLog.this.getTopic(), TransformCommitLog.this.hostName, Long.valueOf(TransformCommitLog.this.hostUid), Integer.valueOf(TransformCommitLog.this.consumerThreadCounter.get()));
                        Properties createConsumerProperties = TransformCommitLog.this.commitLogHost.createConsumerProperties(format);
                        createConsumerProperties.put("key.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
                        createConsumerProperties.put("value.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
                        this.consumer = new KafkaConsumer<>(createConsumerProperties);
                        this.consumer.assign(Collections.singletonList(TransformCommitLog.this.topicPartition));
                        TransformCommitLog.this.logger.info("Started consumer :: thread {} :: groupId :: {} :: topicPartion :: {}", this.tName, format, TransformCommitLog.this.topicPartition);
                        if (!EntityLayerUtils.isTest() && !EntityLayerUtils.isTestServer()) {
                            Thread.sleep(1000L);
                        }
                    }
                    if (this.previousConsumerCompletedOffset != -1) {
                        performOperation(() -> {
                            this.consumer.seek(TransformCommitLog.this.topicPartition, this.previousConsumerCompletedOffset + 1);
                        });
                        this.previousConsumerCompletedOffset = -1L;
                    }
                    if (this.checkCurrentPositionLatch != null) {
                        TransformCommitLog.this.logger.info("{}Check current position", TransformCommitLog.this.datestampIfTest());
                        if (this.currentOffset == -1) {
                            performOperation(() -> {
                                this.consumer.seekToEnd(Collections.singletonList(TransformCommitLog.this.topicPartition));
                            });
                        }
                        long longValue = ((Long) performOperation(() -> {
                            return Long.valueOf(this.consumer.position(TransformCommitLog.this.topicPartition));
                        })).longValue();
                        TransformCommitLog.this.logger.info("{}Current position :: {} - current offset :: {}", TransformCommitLog.this.datestampIfTest(), Long.valueOf(longValue), Long.valueOf(this.currentOffset));
                        if (this.currentOffset == -1) {
                            this.currentOffset = longValue - 1;
                            TransformCommitLog.this.highestSeekOffset = Math.max(TransformCommitLog.this.highestSeekOffset, this.currentOffset);
                        }
                        if (longValue == this.currentOffset + 1) {
                            this.checkCurrentPositionLatch.countDown();
                            this.checkCurrentPositionLatch = null;
                        }
                    }
                    consumerRecords = (ConsumerRecords) performOperation(() -> {
                        return this.consumer.poll(Duration.ofMillis(this.pollTimeout));
                    });
                } catch (Throwable th) {
                    if (!CommonUtils.hasCauseOfClass(th, WakeupException.class)) {
                        th.printStackTrace();
                        Ax.out("BKC transform consumer issue");
                        if (!Ax.isTest()) {
                            EntityLayerLogging.persistentLog("BKC transform consumer issue", "KAFKA_EXCEPTION");
                        }
                    }
                    if (!(th instanceof WakeupException)) {
                        try {
                            if (this.consumer != null) {
                                this.consumer.close();
                            }
                            Thread.sleep(1000L);
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }
                    this.consumer = null;
                }
                if (TransformCommitLog.this.stopped.get() && !this.cancelled.get()) {
                    return;
                }
                boolean z = false;
                Iterator it2 = consumerRecords.iterator();
                while (it2.hasNext()) {
                    ConsumerRecord consumerRecord = (ConsumerRecord) it2.next();
                    try {
                        try {
                            ClusterTransformRequest deserialize = ((ClusterTransformSerializer) Registry.impl(ClusterTransformSerializer.class)).deserialize((byte[]) consumerRecord.value(), TransformCommitLog.this.getClass());
                            if (deserialize == null) {
                                TransformCommitLog.this.logger.info("Received partial packet - {}", ((ClusterTransformSerializer) Registry.impl(ClusterTransformSerializer.class)).getLastPartialId());
                            } else {
                                TransformCommitLog.this.logAcceptRecord(deserialize);
                                TransformCommitLog.this.payloadConsumer.accept(deserialize);
                            }
                            z = true;
                            this.currentOffset = consumerRecord.offset();
                            TransformCommitLog.this.highestSeekOffset = Math.max(TransformCommitLog.this.highestSeekOffset, this.currentOffset);
                        } catch (Throwable th2) {
                            this.currentOffset = consumerRecord.offset();
                            TransformCommitLog.this.highestSeekOffset = Math.max(TransformCommitLog.this.highestSeekOffset, this.currentOffset);
                            throw th2;
                            break;
                        }
                    } catch (Exception e2) {
                        e2.printStackTrace();
                        z = true;
                        this.currentOffset = consumerRecord.offset();
                        TransformCommitLog.this.highestSeekOffset = Math.max(TransformCommitLog.this.highestSeekOffset, this.currentOffset);
                    }
                }
                if (this.checkCurrentPositionLatch != null) {
                    this.checkCurrentPositionLatch.countDown();
                    this.checkCurrentPositionLatch = null;
                }
                if (!z) {
                    performOperation(() -> {
                        this.consumer.commitSync();
                    });
                }
            }
            TransformCommitLog.this.logger.warn("Exiting consumer thread {} -  {}", getClass().getSimpleName(), getName());
        }
    }

    protected synchronized void checkPollTimeout() {
        if (this.currentConsumerThread == null || this.currentConsumerThread.cancelled.get()) {
            return;
        }
        long j = this.currentConsumerThread.operationStartTime;
        long currentTimeMillis = System.currentTimeMillis() - j;
        int i = this.currentConsumerThread.connected() ? 1 : 4;
        if (j == 0 || currentTimeMillis <= this.pollTimeout * i) {
            return;
        }
        if (this.currentConsumerThread.currentOffset == -1) {
            this.logger.warn("Restarting {} consumer - not started - ordinal {} ", getClass().getSimpleName(), Integer.valueOf(this.consumerThreadCounter.get()));
        }
        long j2 = this.currentConsumerThread.currentOffset;
        if (j2 == -1) {
            j2 = this.currentConsumerThread.previousConsumerCompletedOffset;
        }
        if (j2 == -1) {
            j2 = this.highestSeekOffset;
        }
        this.logger.warn("Restarting {} consumer - poll timeout: {} - max {} - ordinal {} - seekTo {}", getClass().getSimpleName(), Long.valueOf(currentTimeMillis), Long.valueOf(this.pollTimeout), Integer.valueOf(this.consumerThreadCounter.get()), Long.valueOf(j2));
        this.currentConsumerThread.cancelled.set(true);
        launchConsumerThread(j2);
    }

    public void consumer(TransformCommitLogHost transformCommitLogHost, Consumer<ClusterTransformRequest> consumer, String str, long j) {
        this.commitLogHost = transformCommitLogHost;
        this.payloadConsumer = consumer;
        this.hostName = str;
        this.hostUid = j;
        this.pollTimeout = transformCommitLogHost.getPollTimeout();
        if (this.initialised.getAndSet(true)) {
            return;
        }
        this.topicPartition = new TopicPartition(getTopic(), 0);
        this.logger.info("{}Launch consumer thread :: {}", datestampIfTest(), str);
        launchConsumerThread(-1L);
        this.logger.info("Started queue :: host {} :: offset {}", str, Long.valueOf(this.currentConsumerThread.currentOffset));
        this.timeoutChecker = new Timer(true);
        this.timeoutChecker.scheduleAtFixedRate(new TimerTask() { // from class: cc.alcina.framework.servlet.cluster.transform.TransformCommitLog.1
            @Override // java.util.TimerTask, java.lang.Runnable
            public void run() {
                TransformCommitLog.this.checkPollTimeout();
            }
        }, this.pollTimeout, this.pollTimeout);
        MethodContext.instance().withMetricKey("tcl-initial-position").run(() -> {
            refreshCurrentPosition();
        });
    }

    private String datestampIfTest() {
        return Ax.isTest() ? Ax.timestamp(new Date()) + " " : "";
    }

    public long getCommitPollReturned() {
        if (this.currentConsumerThread == null) {
            return -1L;
        }
        return this.currentConsumerThread.operationEndTime;
    }

    public long getCommitPosition() {
        if (this.currentConsumerThread == null) {
            return -1L;
        }
        return this.currentConsumerThread.currentOffset;
    }

    protected String getTopic() {
        return this.commitLogHost.getTopic();
    }

    private void launchConsumerThread(long j) {
        ClassLoader servletLayerClassLoader = EntityLayerObjects.get().getServletLayerClassLoader();
        String format = Ax.format("kafka-consumer-%s-%s", getClass().getSimpleName(), Integer.valueOf(this.consumerThreadCounter.incrementAndGet()));
        CountDownLatch countDownLatch = null;
        if (this.currentConsumerThread != null) {
            countDownLatch = this.currentConsumerThread.checkCurrentPositionLatch;
            this.logger.info("Restarting consumer with existing position latch");
        }
        this.currentConsumerThread = new TransformCommitLogThread(servletLayerClassLoader, format, j);
        this.currentConsumerThread.checkCurrentPositionLatch = countDownLatch;
        this.currentConsumerThread.start();
    }

    protected void logAcceptRecord(ClusterTransformRequest clusterTransformRequest) {
    }

    /* JADX WARN: Multi-variable type inference failed */
    public Producer<Void, byte[]> producer() {
        if (this.producer == null) {
            Properties createProducerProperties = this.commitLogHost.createProducerProperties(getClass());
            createProducerProperties.put("key.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
            createProducerProperties.put("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
            this.producer = new KafkaProducer(createProducerProperties);
        }
        return this.producer;
    }

    public void refreshCurrentPosition() {
        if (!EntityLayerUtils.isTest() || this.currentConsumerThread.currentOffset == -1) {
            this.currentConsumerThread.checkCurrentPosition();
        }
    }

    public synchronized List<Future<RecordMetadata>> sendTransformPublishedMessages(DomainTransformRequestPersistent domainTransformRequestPersistent, ClusterTransformRequest.State state) {
        return sendTransformPublishedMessages0(domainTransformRequestPersistent, state);
    }

    private List<Future<RecordMetadata>> sendTransformPublishedMessages0(DomainTransformRequestPersistent domainTransformRequestPersistent, ClusterTransformRequest.State state) {
        if (producer() != null) {
            return (List) serialize(domainTransformRequestPersistent, state).stream().map(bArr -> {
                return producer().send(new ProducerRecord(getTopic(), (Object) null, bArr));
            }).collect(Collectors.toList());
        }
        this.logger.warn("Mpt sending transform packets - no producer");
        return new ArrayList();
    }

    private List<byte[]> serialize(DomainTransformRequestPersistent domainTransformRequestPersistent, ClusterTransformRequest.State state) {
        return ((ClusterTransformSerializer) Registry.impl(ClusterTransformSerializer.class)).serialize(domainTransformRequestPersistent, state);
    }

    public void shutdown() {
        this.stopped.set(true);
        if (this.producer != null) {
            this.producer.close();
        }
        if (this.timeoutChecker != null) {
            this.timeoutChecker.cancel();
        }
    }
}
