001/*
002 * Copyright (C) Photon Vision.
003 *
004 * This program is free software: you can redistribute it and/or modify
005 * it under the terms of the GNU General Public License as published by
006 * the Free Software Foundation, either version 3 of the License, or
007 * (at your option) any later version.
008 *
009 * This program is distributed in the hope that it will be useful,
010 * but WITHOUT ANY WARRANTY; without even the implied warranty of
011 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
012 * GNU General Public License for more details.
013 *
014 * You should have received a copy of the GNU General Public License
015 * along with this program.  If not, see <https://www.gnu.org/licenses/>.
016 */
017
018package org.photonvision.common.dataflow;
019
020import java.util.concurrent.BlockingQueue;
021import java.util.concurrent.CopyOnWriteArrayList;
022import java.util.concurrent.LinkedBlockingQueue;
023import java.util.stream.Collectors;
024import org.photonvision.common.dataflow.events.DataChangeEvent;
025import org.photonvision.common.logging.LogGroup;
026import org.photonvision.common.logging.Logger;
027
028@SuppressWarnings("rawtypes")
029public class DataChangeService {
030    private static final Logger logger = new Logger(DataChangeService.class, LogGroup.WebServer);
031
032    public static class SubscriberHandle {
033        private final int[] idxs;
034
035        private SubscriberHandle(int[] idxs) {
036            this.idxs = idxs;
037        }
038
039        private SubscriberHandle(int idx) {
040            this.idxs = new int[] {idx};
041        }
042
043        public void stop() {
044            for (int idx : idxs) {
045                if (idx < 0) continue;
046                getInstance().subscribers.set(idx, null);
047            }
048        }
049    }
050
051    private static class ThreadSafeSingleton {
052        private static final DataChangeService INSTANCE = new DataChangeService();
053    }
054
055    public static DataChangeService getInstance() {
056        return ThreadSafeSingleton.INSTANCE;
057    }
058
059    private final CopyOnWriteArrayList<DataChangeSubscriber> subscribers;
060
061    @SuppressWarnings("FieldCanBeLocal")
062    private final Thread dispatchThread;
063
064    private final BlockingQueue<DataChangeEvent> eventQueue = new LinkedBlockingQueue<>();
065
066    private DataChangeService() {
067        subscribers = new CopyOnWriteArrayList<>();
068        dispatchThread = new Thread(this::dispatchFromQueue);
069        dispatchThread.setName("DataChangeEventDispatchThread");
070        dispatchThread.start();
071    }
072
073    public boolean hasEvents() {
074        return !eventQueue.isEmpty();
075    }
076
077    private void dispatchFromQueue() {
078        while (true) {
079            try {
080                var taken = eventQueue.take();
081                for (var sub : subscribers) {
082                    if (sub == null) continue;
083                    if (sub.wantedSources.contains(taken.sourceType)
084                            && sub.wantedDestinations.contains(taken.destType)) {
085                        sub.onDataChangeEvent(taken);
086                    }
087                }
088            } catch (Exception e) {
089                logger.error("Exception when dispatching event!", e);
090                e.printStackTrace();
091            }
092        }
093    }
094
095    public SubscriberHandle addSubscriber(DataChangeSubscriber subscriber) {
096        if (!subscribers.addIfAbsent(subscriber)) {
097            logger.warn("Attempted to add already added subscriber!");
098            return new SubscriberHandle(-1);
099        } else {
100            logger.debug(
101                    () -> {
102                        var sources =
103                                subscriber.wantedSources.stream()
104                                        .map(Enum::toString)
105                                        .collect(Collectors.joining(", "));
106                        var dests =
107                                subscriber.wantedDestinations.stream()
108                                        .map(Enum::toString)
109                                        .collect(Collectors.joining(", "));
110
111                        return "Added subscriber - " + "Sources: " + sources + ", Destinations: " + dests;
112                    });
113            return new SubscriberHandle(subscribers.size() - 1);
114        }
115    }
116
117    public SubscriberHandle addSubscribers(DataChangeSubscriber... subs) {
118        int[] idxs = new int[subs.length];
119        for (int i = 0; i < subs.length; i++) {
120            idxs[i] = addSubscriber(subs[i]).idxs[0];
121        }
122        return new SubscriberHandle(idxs);
123    }
124
125    public void publishEvent(DataChangeEvent event) {
126        eventQueue.offer(event);
127    }
128
129    public void publishEvents(DataChangeEvent... events) {
130        for (var event : events) {
131            publishEvent(event);
132        }
133    }
134}