package io.moquette.broker;

import io.moquette.broker.Session;
import io.moquette.broker.subscriptions.ISubscriptionsDirectory;
import io.moquette.broker.subscriptions.Subscription;
import io.moquette.broker.subscriptions.Topic;
import io.moquette.interception.BrokerInterceptor;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.handler.codec.mqtt.MqttConnectMessage;
import io.netty.handler.codec.mqtt.MqttFixedHeader;
import io.netty.handler.codec.mqtt.MqttMessageIdVariableHeader;
import io.netty.handler.codec.mqtt.MqttMessageType;
import io.netty.handler.codec.mqtt.MqttPublishMessage;
import io.netty.handler.codec.mqtt.MqttQoS;
import io.netty.handler.codec.mqtt.MqttSubAckMessage;
import io.netty.handler.codec.mqtt.MqttSubAckPayload;
import io.netty.handler.codec.mqtt.MqttSubscribeMessage;
import io.netty.handler.codec.mqtt.MqttTopicSubscription;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: classes2.dex */
public class PostOffice {
    private static final Logger LOG = LoggerFactory.getLogger((Class<?>) PostOffice.class);
    private final Authorizator authorizator;
    private BrokerInterceptor interceptor;
    private final IRetainedRepository retainedRepository;
    private SessionRegistry sessionRegistry;
    private final ISubscriptionsDirectory subscriptions;

    /* JADX INFO: Access modifiers changed from: package-private */
    public PostOffice(ISubscriptionsDirectory iSubscriptionsDirectory, IRetainedRepository iRetainedRepository, SessionRegistry sessionRegistry, BrokerInterceptor brokerInterceptor, Authorizator authorizator) {
        this.authorizator = authorizator;
        this.subscriptions = iSubscriptionsDirectory;
        this.retainedRepository = iRetainedRepository;
        this.sessionRegistry = sessionRegistry;
        this.interceptor = brokerInterceptor;
    }

    private MqttSubAckMessage doAckMessageFromValidateFilters(List<MqttTopicSubscription> list, int i) {
        ArrayList arrayList = new ArrayList();
        Iterator<MqttTopicSubscription> it = list.iterator();
        while (it.hasNext()) {
            arrayList.add(Integer.valueOf(it.next().qualityOfService().value()));
        }
        return new MqttSubAckMessage(new MqttFixedHeader(MqttMessageType.SUBACK, false, MqttQoS.AT_MOST_ONCE, false, 0), MqttMessageIdVariableHeader.from(i), new MqttSubAckPayload(arrayList));
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public static /* synthetic */ boolean lambda$subscribeClientToTopics$0(MqttTopicSubscription mqttTopicSubscription) {
        return mqttTopicSubscription.qualityOfService() != MqttQoS.FAILURE;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public static /* synthetic */ Subscription lambda$subscribeClientToTopics$1(String str, MqttTopicSubscription mqttTopicSubscription) {
        return new Subscription(str, new Topic(mqttTopicSubscription.topicName()), mqttTopicSubscription.qualityOfService());
    }

    static MqttQoS lowerQosToTheSubscriptionDesired(Subscription subscription, MqttQoS mqttQoS) {
        return mqttQoS.value() > subscription.getRequestedQos().value() ? subscription.getRequestedQos() : mqttQoS;
    }

    private void publish2Subscribers(ByteBuf byteBuf, Topic topic, MqttQoS mqttQoS) {
        for (Subscription subscription : this.subscriptions.matchQosSharpening(topic)) {
            MqttQoS lowerQosToTheSubscriptionDesired = lowerQosToTheSubscriptionDesired(subscription, mqttQoS);
            Session retrieve = this.sessionRegistry.retrieve(subscription.getClientId());
            if (retrieve != null) {
                LOG.debug("Sending PUBLISH message to active subscriber CId: {}, topicFilter: {}, qos: {}", subscription.getClientId(), subscription.getTopicFilter(), lowerQosToTheSubscriptionDesired);
                retrieve.sendPublishOnSessionAtQos(topic, lowerQosToTheSubscriptionDesired, byteBuf);
            } else {
                LOG.debug("PUBLISH to not yet present session. CId: {}, topicFilter: {}, qos: {}", subscription.getClientId(), subscription.getTopicFilter(), lowerQosToTheSubscriptionDesired);
            }
        }
    }

    private void publishRetainedMessagesForSubscriptions(String str, List<Subscription> list) {
        Session retrieve = this.sessionRegistry.retrieve(str);
        for (Subscription subscription : list) {
            List<RetainedMessage> retainedOnTopic = this.retainedRepository.retainedOnTopic(subscription.getTopicFilter().toString());
            if (!retainedOnTopic.isEmpty()) {
                for (RetainedMessage retainedMessage : retainedOnTopic) {
                    MqttQoS lowerQosToTheSubscriptionDesired = lowerQosToTheSubscriptionDesired(subscription, retainedMessage.qosLevel());
                    ByteBuf wrappedBuffer = Unpooled.wrappedBuffer(retainedMessage.getPayload());
                    retrieve.sendRetainedPublishOnSessionAtQos(retainedMessage.getTopic(), lowerQosToTheSubscriptionDesired, wrappedBuffer);
                    wrappedBuffer.release();
                }
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void dispatchConnection(MqttConnectMessage mqttConnectMessage) {
        this.interceptor.notifyClientConnected(mqttConnectMessage);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void dispatchConnectionLost(String str, String str2) {
        this.interceptor.notifyClientConnectionLost(str, str2);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void dispatchDisconnection(String str, String str2) {
        this.interceptor.notifyClientDisconnected(str, str2);
    }

    public void fireWill(Session.Will will) {
        publish2Subscribers(will.payload, new Topic(will.topic), will.qos);
    }

    public void init(SessionRegistry sessionRegistry) {
        this.sessionRegistry = sessionRegistry;
    }

    public void internalPublish(MqttPublishMessage mqttPublishMessage) {
        MqttQoS qosLevel = mqttPublishMessage.fixedHeader().qosLevel();
        Topic topic = new Topic(mqttPublishMessage.variableHeader().topicName());
        ByteBuf payload = mqttPublishMessage.payload();
        LOG.info("Sending internal PUBLISH message Topic={}, qos={}", topic, qosLevel);
        publish2Subscribers(payload, topic, qosLevel);
        if (mqttPublishMessage.fixedHeader().isRetain()) {
            if (qosLevel == MqttQoS.AT_MOST_ONCE || payload.readableBytes() == 0) {
                this.retainedRepository.cleanRetained(topic);
            } else {
                this.retainedRepository.retain(topic, mqttPublishMessage);
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void receivedPublishQos0(Topic topic, String str, String str2, MqttPublishMessage mqttPublishMessage) {
        if (!this.authorizator.canWrite(topic, str, str2)) {
            LOG.error("client is not authorized to publish on topic: {}", topic);
            return;
        }
        publish2Subscribers(mqttPublishMessage.payload(), topic, MqttQoS.AT_MOST_ONCE);
        if (mqttPublishMessage.fixedHeader().isRetain()) {
            this.retainedRepository.cleanRetained(topic);
        }
        this.interceptor.notifyTopicPublished(mqttPublishMessage, str2, str);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void receivedPublishQos1(MQTTConnection mQTTConnection, Topic topic, String str, int i, MqttPublishMessage mqttPublishMessage) {
        topic.getTokens();
        if (!topic.isValid()) {
            LOG.warn("Invalid topic format, force close the connection");
            mQTTConnection.dropConnection();
            return;
        }
        String clientId = mQTTConnection.getClientId();
        if (!this.authorizator.canWrite(topic, str, clientId)) {
            LOG.error("MQTT client: {} is not authorized to publish on topic: {}", clientId, topic);
            return;
        }
        ByteBuf payload = mqttPublishMessage.payload();
        publish2Subscribers(payload, topic, MqttQoS.AT_LEAST_ONCE);
        mQTTConnection.sendPubAck(i);
        if (mqttPublishMessage.fixedHeader().isRetain()) {
            if (payload.isReadable()) {
                this.retainedRepository.retain(topic, mqttPublishMessage);
            } else {
                this.retainedRepository.cleanRetained(topic);
            }
        }
        this.interceptor.notifyTopicPublished(mqttPublishMessage, clientId, str);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void receivedPublishQos2(MQTTConnection mQTTConnection, MqttPublishMessage mqttPublishMessage, String str) {
        Logger logger = LOG;
        logger.trace("Processing PUBREL message on connection: {}", mQTTConnection);
        Topic topic = new Topic(mqttPublishMessage.variableHeader().topicName());
        ByteBuf payload = mqttPublishMessage.payload();
        if (!this.authorizator.canWrite(topic, str, mQTTConnection.getClientId())) {
            logger.error("MQTT client is not authorized to publish on topic: {}", topic);
            return;
        }
        publish2Subscribers(payload, topic, MqttQoS.EXACTLY_ONCE);
        if (mqttPublishMessage.fixedHeader().isRetain()) {
            if (payload.isReadable()) {
                this.retainedRepository.retain(topic, mqttPublishMessage);
            } else {
                this.retainedRepository.cleanRetained(topic);
            }
        }
        this.interceptor.notifyTopicPublished(mqttPublishMessage, mQTTConnection.getClientId(), str);
    }

    public void subscribeClientToTopics(MqttSubscribeMessage mqttSubscribeMessage, final String str, String str2, MQTTConnection mQTTConnection) {
        int messageId = Utils.messageId(mqttSubscribeMessage);
        List<MqttTopicSubscription> verifyTopicsReadAccess = this.authorizator.verifyTopicsReadAccess(str, str2, mqttSubscribeMessage);
        MqttSubAckMessage doAckMessageFromValidateFilters = doAckMessageFromValidateFilters(verifyTopicsReadAccess, messageId);
        List<Subscription> list = (List) verifyTopicsReadAccess.stream().filter(new Predicate() { // from class: io.moquette.broker.PostOffice$$ExternalSyntheticLambda0
            @Override // java.util.function.Predicate
            public final boolean test(Object obj) {
                return PostOffice.lambda$subscribeClientToTopics$0((MqttTopicSubscription) obj);
            }
        }).map(new Function() { // from class: io.moquette.broker.PostOffice$$ExternalSyntheticLambda1
            @Override // java.util.function.Function
            public final Object apply(Object obj) {
                return PostOffice.lambda$subscribeClientToTopics$1(str, (MqttTopicSubscription) obj);
            }
        }).collect(Collectors.toList());
        Iterator<Subscription> it = list.iterator();
        while (it.hasNext()) {
            this.subscriptions.add(it.next());
        }
        this.sessionRegistry.retrieve(str).addSubscriptions(list);
        mQTTConnection.sendSubAckMessage(messageId, doAckMessageFromValidateFilters);
        publishRetainedMessagesForSubscriptions(str, list);
        Iterator<Subscription> it2 = list.iterator();
        while (it2.hasNext()) {
            this.interceptor.notifyTopicSubscribed(it2.next(), str2);
        }
    }

    public void unsubscribe(List<String> list, MQTTConnection mQTTConnection, int i) {
        String clientId = mQTTConnection.getClientId();
        Iterator<String> it = list.iterator();
        while (it.hasNext()) {
            Topic topic = new Topic(it.next());
            if (!topic.isValid()) {
                mQTTConnection.dropConnection();
                LOG.warn("Topic filter is not valid. topics: {}, offending topic filter: {}", list, topic);
                return;
            } else {
                LOG.trace("Removing subscription topic={}", topic);
                this.subscriptions.removeSubscription(topic, clientId);
                this.interceptor.notifyTopicUnsubscribed(topic.toString(), clientId, NettyUtils.userName(mQTTConnection.channel));
            }
        }
        mQTTConnection.sendUnsubAckMessage(list, clientId, i);
    }
}
