package io.moquette.broker.unsafequeues;

import io.moquette.broker.unsafequeues.PagedFilesAllocator;
import java.io.FileNotFoundException;
import java.io.FileReader;
import java.io.FileWriter;
import java.io.IOException;
import java.nio.MappedByteBuffer;
import java.nio.channels.FileChannel;
import java.nio.file.Files;
import java.nio.file.LinkOption;
import java.nio.file.Path;
import java.nio.file.StandardOpenOption;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Properties;
import java.util.Set;
import java.util.TreeSet;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ConcurrentSkipListSet;
import java.util.concurrent.locks.ReentrantLock;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/moquette/broker/unsafequeues/QueuePool.class */
public class QueuePool {
    private static final Logger LOG = LoggerFactory.getLogger(QueuePool.class);
    static final boolean queueDebug = Boolean.parseBoolean(System.getProperty("moquette.queue.debug", "false"));
    private final SegmentAllocator allocator;
    private final Path dataPath;
    private final int segmentSize;
    private final ConcurrentMap<QueueName, LinkedList<SegmentRef>> queueSegments = new ConcurrentHashMap();
    private final ConcurrentMap<QueueName, Queue> queues = new ConcurrentHashMap();
    private final ConcurrentSkipListSet<SegmentRef> recycledSegments = new ConcurrentSkipListSet<>();
    private final ReentrantLock segmentsAllocationLock = new ReentrantLock();
    private final SegmentAllocationCallback callback = new SegmentAllocationCallback();

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/moquette/broker/unsafequeues/QueuePool$QueueName.class */
    public static class QueueName {
        final String name;

        private QueueName(String str) {
            this.name = str;
        }

        public boolean equals(Object obj) {
            if (this == obj) {
                return true;
            }
            if (obj == null || getClass() != obj.getClass()) {
                return false;
            }
            return Objects.equals(this.name, ((QueueName) obj).name);
        }

        public int hashCode() {
            return Objects.hash(this.name);
        }

        public String toString() {
            return "QueueName{name='" + this.name + "'}";
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/moquette/broker/unsafequeues/QueuePool$SegmentAllocationCallback.class */
    public static class SegmentAllocationCallback implements PagedFilesAllocator.AllocationListener {
        private final QueuePool queuePool;

        private SegmentAllocationCallback(QueuePool queuePool) {
            this.queuePool = queuePool;
        }

        @Override // io.moquette.broker.unsafequeues.PagedFilesAllocator.AllocationListener
        public void segmentedCreated(String str, Segment segment) {
            this.queuePool.segmentedCreated(str, segment);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:io/moquette/broker/unsafequeues/QueuePool$SegmentRef.class */
    public static class SegmentRef implements Comparable<SegmentRef> {
        final int pageId;
        final int offset;

        SegmentRef(int i, int i2) {
            this.pageId = i;
            this.offset = i2;
        }

        public SegmentRef(Segment segment) {
            this.pageId = segment.begin.pageId();
            this.offset = segment.begin.offset();
        }

        public String toString() {
            return String.format("(%d, %d)", Integer.valueOf(this.pageId), Integer.valueOf(this.offset));
        }

        @Override // java.lang.Comparable
        public int compareTo(SegmentRef segmentRef) {
            int compare = Integer.compare(this.pageId, segmentRef.pageId);
            return compare != 0 ? compare : Integer.compare(this.offset, segmentRef.offset);
        }
    }

    private QueuePool(SegmentAllocator segmentAllocator, Path path, int i) {
        this.allocator = segmentAllocator;
        this.dataPath = path;
        this.segmentSize = i;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void segmentedCreated(String str, Segment segment) {
        LOG.debug("Registering new segment {} for queue {}", segment, str);
        QueueName queueName = new QueueName(str);
        LinkedList<SegmentRef> computeIfAbsent = this.queueSegments.computeIfAbsent(queueName, queueName2 -> {
            return new LinkedList();
        });
        computeIfAbsent.add(0, new SegmentRef(segment));
        LOG.debug("queueSegments for queue {} after insertion {}", queueName, computeIfAbsent);
    }

    public static QueuePool loadQueues(Path path, int i, int i2) throws QueueException {
        Properties createOrLoadCheckpointFile = createOrLoadCheckpointFile(path);
        QueuePool queuePool = new QueuePool(new PagedFilesAllocator(path, i, i2, Integer.parseInt(createOrLoadCheckpointFile.getProperty("segments.last_page", "0")), Integer.parseInt(createOrLoadCheckpointFile.getProperty("segments.last_segment", "0"))), path, i2);
        queuePool.loadQueueDefinitions(createOrLoadCheckpointFile);
        LOG.debug("Loaded queues definitions: {}", queuePool.queueSegments);
        queuePool.loadRecycledSegments(createOrLoadCheckpointFile);
        LOG.debug("Recyclable segments are: {}", queuePool.recycledSegments);
        return queuePool;
    }

    public Set<String> queueNames() {
        return (Set) this.queues.keySet().stream().map(queueName -> {
            return queueName.name;
        }).collect(Collectors.toSet());
    }

    private static Properties createOrLoadCheckpointFile(Path path) throws QueueException {
        Path resolve = path.resolve("checkpoint.properties");
        if (!Files.exists(resolve, new LinkOption[0])) {
            LOG.info("Can't find any file named 'checkpoint.properties' in path: {}, creating new one", path);
            try {
                if (!resolve.toFile().createNewFile()) {
                    LOG.warn("Found a checkpoint file while bootstrapping {}", resolve);
                }
            } catch (IOException e) {
                LOG.error("IO Error creating the file {}", resolve, e);
                throw new QueueException("Reached an IO error during the bootstrapping of empty 'checkpoint.properties'", e);
            }
        }
        try {
            FileReader fileReader = new FileReader(resolve.toFile());
            Properties properties = new Properties();
            try {
                properties.load(fileReader);
                return properties;
            } catch (IOException e2) {
                throw new QueueException("if an error occurred when reading from: " + resolve, e2);
            }
        } catch (FileNotFoundException e3) {
            throw new QueueException("Can't find any file named 'checkpoint.properties' in path: " + path, e3);
        }
    }

    private void loadQueueDefinitions(Properties properties) throws QueueException {
        boolean z = false;
        int i = 0;
        while (!z) {
            String format = String.format("queues.%d.name", Integer.valueOf(i));
            if (properties.containsKey(format)) {
                QueueName queueName = new QueueName(properties.getProperty(format));
                LinkedList<SegmentRef> decodeSegments = decodeSegments(properties.getProperty(String.format("queues.%d.segments", Integer.valueOf(i))));
                int size = decodeSegments.size();
                this.queueSegments.put(queueName, decodeSegments);
                long parseLong = Long.parseLong(properties.getProperty(String.format("queues.%d.head_offset", Integer.valueOf(i))));
                SegmentRef segmentRef = decodeSegments.get(0);
                SegmentPointer segmentPointer = new SegmentPointer(segmentRef.pageId, parseLong);
                Segment reopenSegment = this.allocator.reopenSegment(segmentRef.pageId, segmentRef.offset);
                long parseLong2 = Long.parseLong(properties.getProperty(String.format("queues.%d.tail_offset", Integer.valueOf(i))));
                SegmentRef last = decodeSegments.getLast();
                SegmentPointer segmentPointer2 = new SegmentPointer(last.pageId, parseLong2);
                this.queues.put(queueName, new Queue(queueName.name, reopenSegment, new VirtualPointer(((size - 1) * this.segmentSize) + segmentPointer.offset()), this.allocator.reopenSegment(last.pageId, last.offset), new VirtualPointer(segmentPointer2.offset()), this.allocator, this.callback, this));
                i++;
            } else {
                z = true;
            }
        }
    }

    private void loadRecycledSegments(Properties properties) throws QueueException {
        TreeSet<SegmentRef> treeSet = new TreeSet<>();
        boolean z = false;
        int i = 0;
        while (!z) {
            if (properties.containsKey(String.format("queues.%d.name", Integer.valueOf(i)))) {
                treeSet.addAll(decodeSegments(properties.getProperty(String.format("queues.%d.segments", Integer.valueOf(i)))));
                i++;
            } else {
                z = true;
            }
        }
        if (treeSet.isEmpty()) {
            return;
        }
        List<SegmentRef> recreateSegmentHoles = recreateSegmentHoles(treeSet);
        this.segmentsAllocationLock.lock();
        try {
            this.recycledSegments.addAll(recreateSegmentHoles);
            this.segmentsAllocationLock.unlock();
        } catch (Throwable th) {
            this.segmentsAllocationLock.unlock();
            throw th;
        }
    }

    List<SegmentRef> recreateSegmentHoles(TreeSet<SegmentRef> treeSet) throws QueueException {
        if (treeSet.isEmpty()) {
            throw new QueueException("Status error, expected to find at least one segment");
        }
        SegmentRef segmentRef = null;
        LinkedList linkedList = new LinkedList();
        Iterator<SegmentRef> it = treeSet.iterator();
        while (it.hasNext()) {
            SegmentRef next = it.next();
            if (segmentRef == null) {
                linkedList.addAll(recreateRecycledSegmentsBetween(next));
                segmentRef = next;
            } else if (isAdjacent(segmentRef, next)) {
                segmentRef = next;
            } else if (segmentRef.pageId == next.pageId) {
                linkedList.addAll(recreateRecycledSegments(segmentRef.offset + this.segmentSize, next.offset, segmentRef.pageId));
            } else {
                linkedList.addAll(recreateRecycledSegmentsBetween(segmentRef, next));
            }
        }
        return linkedList;
    }

    private boolean isAdjacent(SegmentRef segmentRef, SegmentRef segmentRef2) {
        return segmentRef.pageId == segmentRef2.pageId ? segmentRef.offset + this.segmentSize == segmentRef2.offset : segmentRef.pageId + 1 == segmentRef2.pageId && segmentRef.offset == this.allocator.getPageSize() - this.segmentSize && segmentRef2.offset == 0;
    }

    private List<SegmentRef> recreateRecycledSegmentsBetween(SegmentRef segmentRef) {
        return recreateRecycledSegmentsBetween(null, segmentRef);
    }

    private List<SegmentRef> recreateRecycledSegmentsBetween(SegmentRef segmentRef, SegmentRef segmentRef2) {
        LinkedList linkedList = new LinkedList();
        int i = 0;
        if (segmentRef != null) {
            int i2 = segmentRef.pageId;
            linkedList.addAll(recreateRecycledSegments(segmentRef.offset + this.segmentSize, this.allocator.getPageSize(), segmentRef.pageId));
            i = i2 + 1;
        }
        while (i < segmentRef2.pageId) {
            linkedList.addAll(recreateRecycledSegments(0, this.allocator.getPageSize(), i));
            i++;
        }
        linkedList.addAll(recreateRecycledSegments(0, segmentRef2.offset, segmentRef2.pageId));
        return linkedList;
    }

    private List<SegmentRef> recreateRecycledSegments(int i, int i2, int i3) {
        LinkedList linkedList = new LinkedList();
        while (i != i2) {
            linkedList.add(new SegmentRef(i3, i));
            i += this.segmentSize;
        }
        return linkedList;
    }

    private LinkedList<SegmentRef> decodeSegments(String str) {
        String[] split = str.substring(str.indexOf("(") + 1, str.lastIndexOf(")")).split("\\), \\(");
        LinkedList<SegmentRef> linkedList = new LinkedList<>();
        for (String str2 : split) {
            String[] split2 = str2.split(",");
            linkedList.offer(new SegmentRef(Integer.parseInt(split2[0].trim()), Integer.parseInt(split2[1].trim())));
        }
        return linkedList;
    }

    public Queue getOrCreate(String str) throws QueueException {
        QueueName queueName = new QueueName(str);
        if (this.queues.containsKey(queueName)) {
            return this.queues.get(queueName);
        }
        Segment nextFreeSegment = nextFreeSegment();
        segmentedCreated(str, nextFreeSegment);
        Queue queue = new Queue(str, nextFreeSegment, VirtualPointer.buildUntouched(), nextFreeSegment, VirtualPointer.buildUntouched(), this.allocator, this.callback, this);
        this.queues.put(queueName, queue);
        return queue;
    }

    public void close() throws QueueException {
        this.allocator.close();
        Properties properties = new Properties();
        this.allocator.dumpState(properties);
        for (Map.Entry<QueueName, LinkedList<SegmentRef>> entry : this.queueSegments.entrySet()) {
            QueueName key = entry.getKey();
            properties.setProperty("queues.0.name", key.name);
            properties.setProperty("queues.0.segments", (String) entry.getValue().stream().map((v0) -> {
                return v0.toString();
            }).collect(Collectors.joining(", ")));
            Queue queue = this.queues.get(key);
            properties.setProperty("queues.0.head_offset", String.valueOf(queue.currentHead().segmentOffset(this.segmentSize)));
            properties.setProperty("queues.0.tail_offset", String.valueOf(queue.currentTail().segmentOffset(this.segmentSize)));
        }
        try {
            try {
                properties.store(new FileWriter(this.dataPath.resolve("checkpoint.properties").toFile()), "DON'T EDIT, AUTOGENERATED");
            } catch (IOException e) {
                throw new QueueException("Problem writing checkpoint.properties file", e);
            }
        } catch (IOException e2) {
            throw new QueueException("Problem opening checkpoint.properties file", e2);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public Optional<Segment> openNextTailSegment(String str) throws QueueException {
        SegmentRef peekLast = this.queueSegments.get(new QueueName(str)).peekLast();
        if (peekLast == null) {
            return Optional.empty();
        }
        Path resolve = this.dataPath.resolve(String.format("%d.page", Integer.valueOf(peekLast.pageId)));
        if (!Files.exists(resolve, new LinkOption[0])) {
            throw new QueueException("Can't find file for page file" + resolve);
        }
        try {
            FileChannel open = FileChannel.open(resolve, StandardOpenOption.READ, StandardOpenOption.WRITE);
            try {
                MappedByteBuffer map = open.map(FileChannel.MapMode.READ_WRITE, 0L, this.allocator.getPageSize());
                if (open != null) {
                    open.close();
                }
                return Optional.of(new Segment(map, new SegmentPointer(peekLast.pageId, peekLast.offset), new SegmentPointer(peekLast.pageId, (peekLast.offset + this.segmentSize) - 1)));
            } finally {
            }
        } catch (IOException e) {
            throw new QueueException("Can't open page file " + resolve, e);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void consumedTailSegment(String str) {
        QueueName queueName = new QueueName(str);
        SegmentRef pollLast = this.queueSegments.get(queueName).pollLast();
        LOG.debug("Consumed tail segment {} from queue {}", pollLast, queueName);
        this.segmentsAllocationLock.lock();
        try {
            this.recycledSegments.add(pollLast);
            this.segmentsAllocationLock.unlock();
        } catch (Throwable th) {
            this.segmentsAllocationLock.unlock();
            throw th;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public Segment nextFreeSegment() throws QueueException {
        this.segmentsAllocationLock.lock();
        try {
            if (this.recycledSegments.isEmpty()) {
                LOG.debug("no recycled segments available, request the creation of new one");
                return this.allocator.nextFreeSegment();
            }
            SegmentRef pollFirst = this.recycledSegments.pollFirst();
            if (pollFirst == null) {
                throw new QueueException("Invalid state, expected available recycled segment");
            }
            LOG.debug("Reusing recycled segment from page: {} at page offset: {}", Integer.valueOf(pollFirst.pageId), Integer.valueOf(pollFirst.offset));
            return this.allocator.reopenSegment(pollFirst.pageId, pollFirst.offset);
        } finally {
            this.segmentsAllocationLock.unlock();
        }
    }
}
