/*
 * Decompiled with CFR 0.152.
 */
package kieker.analysis.plugin.filter.forward;

import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import kieker.analysis.plugin.annotation.InputPort;
import kieker.analysis.plugin.annotation.OutputPort;
import kieker.analysis.plugin.annotation.Plugin;
import kieker.analysis.plugin.annotation.Property;
import kieker.analysis.plugin.filter.AbstractFilterPlugin;
import kieker.common.configuration.Configuration;
import kieker.common.logging.Log;
import kieker.common.logging.LogFactory;
import kieker.common.record.IMonitoringRecord;

@Plugin(description="Forwards incoming records with delays computed from the timestamp values", outputPorts={@OutputPort(name="outputRecords", eventTypes={IMonitoringRecord.class}, description="Outputs the delayed records")}, configuration={@Property(name="numWorkers", defaultValue="1"), @Property(name="additionalShutdownDelaySeconds", defaultValue="5")})
public class RealtimeRecordDelayFilter
extends AbstractFilterPlugin {
    public static final String INPUT_PORT_NAME_RECORDS = "inputRecords";
    public static final String OUTPUT_PORT_NAME_RECORDS = "outputRecords";
    public static final String CONFIG_PROPERTY_NAME_NUM_WORKERS = "numWorkers";
    public static final String CONFIG_PROPERTY_NAME_ADDITIONAL_SHUTDOWN_DELAY_SECONDS = "additionalShutdownDelaySeconds";
    private static final Log LOG = LogFactory.getLog(RealtimeRecordDelayFilter.class);
    private static final long WARN_ON_NEGATIVE_SCHED_TIME_NANOS = TimeUnit.NANOSECONDS.convert(2L, TimeUnit.SECONDS);
    private final int numWorkers;
    private final ScheduledThreadPoolExecutor executor;
    private final long shutdownDelayNanos;
    private volatile long startTime = -1L;
    private volatile long firstLoggingTimestamp;
    private volatile long latestSchedulingTimeNanos = -1L;

    public RealtimeRecordDelayFilter(Configuration configuration) {
        super(configuration);
        this.numWorkers = configuration.getIntProperty(CONFIG_PROPERTY_NAME_NUM_WORKERS);
        this.shutdownDelayNanos = TimeUnit.NANOSECONDS.convert(this.configuration.getLongProperty(CONFIG_PROPERTY_NAME_ADDITIONAL_SHUTDOWN_DELAY_SECONDS), TimeUnit.SECONDS);
        this.executor = new ScheduledThreadPoolExecutor(this.numWorkers);
        this.executor.setExecuteExistingDelayedTasksAfterShutdownPolicy(true);
        this.executor.setContinueExistingPeriodicTasksAfterShutdownPolicy(false);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @InputPort(name="inputRecords", eventTypes={IMonitoringRecord.class}, description="Receives the records to be delayed")
    public final void inputRecord(final IMonitoringRecord monitoringRecord) {
        long currentTimeNanos = this.currentTimeNanos();
        RealtimeRecordDelayFilter realtimeRecordDelayFilter = this;
        synchronized (realtimeRecordDelayFilter) {
            long absSchedTime;
            long schedTimeNanosFromNow;
            if (this.startTime == -1L) {
                this.firstLoggingTimestamp = monitoringRecord.getLoggingTimestamp();
                this.startTime = currentTimeNanos;
            }
            if ((schedTimeNanosFromNow = monitoringRecord.getLoggingTimestamp() - this.firstLoggingTimestamp - (currentTimeNanos - this.startTime)) < -WARN_ON_NEGATIVE_SCHED_TIME_NANOS) {
                long schedTimeSeconds = TimeUnit.SECONDS.convert(schedTimeNanosFromNow, TimeUnit.NANOSECONDS);
                LOG.warn("negative scheduling time: " + schedTimeNanosFromNow + " (nanos) / " + schedTimeSeconds + " (seconds)-> scheduling with a delay of 0");
            }
            if (schedTimeNanosFromNow < 0L) {
                schedTimeNanosFromNow = 0L;
            }
            if ((absSchedTime = currentTimeNanos + schedTimeNanosFromNow) > this.latestSchedulingTimeNanos) {
                this.latestSchedulingTimeNanos = absSchedTime;
            }
            this.executor.schedule(new Runnable(){

                public void run() {
                    RealtimeRecordDelayFilter.this.deliver(RealtimeRecordDelayFilter.OUTPUT_PORT_NAME_RECORDS, monitoringRecord);
                }
            }, schedTimeNanosFromNow, TimeUnit.NANOSECONDS);
        }
    }

    private long currentTimeNanos() {
        return TimeUnit.NANOSECONDS.convert(System.currentTimeMillis(), TimeUnit.MILLISECONDS);
    }

    public void terminate(boolean error) {
        this.executor.shutdown();
        if (!error) {
            long shutdownDelaySecondsFromNow = TimeUnit.SECONDS.convert(this.latestSchedulingTimeNanos - this.currentTimeNanos() + this.shutdownDelayNanos, TimeUnit.NANOSECONDS);
            if (shutdownDelaySecondsFromNow < 0L) {
                shutdownDelaySecondsFromNow = 0L;
            }
            try {
                LOG.info("Awaiting termination delay of " + shutdownDelaySecondsFromNow + " seconds ...");
                if (!this.executor.awaitTermination(shutdownDelaySecondsFromNow, TimeUnit.SECONDS)) {
                    LOG.error("Termination delay triggerred before all scheduled records sent");
                }
            }
            catch (InterruptedException e) {
                LOG.error("Interrupted while awaiting termination delay", e);
            }
        }
    }

    public Configuration getCurrentConfiguration() {
        Configuration configuration = new Configuration();
        configuration.setProperty(CONFIG_PROPERTY_NAME_NUM_WORKERS, Integer.toString(this.numWorkers));
        configuration.setProperty(CONFIG_PROPERTY_NAME_ADDITIONAL_SHUTDOWN_DELAY_SECONDS, Long.toString(this.shutdownDelayNanos));
        return configuration;
    }
}

