001/* 002 * Copyright (C) Photon Vision. 003 * 004 * This program is free software: you can redistribute it and/or modify 005 * it under the terms of the GNU General Public License as published by 006 * the Free Software Foundation, either version 3 of the License, or 007 * (at your option) any later version. 008 * 009 * This program is distributed in the hope that it will be useful, 010 * but WITHOUT ANY WARRANTY; without even the implied warranty of 011 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 012 * GNU General Public License for more details. 013 * 014 * You should have received a copy of the GNU General Public License 015 * along with this program. If not, see <https://www.gnu.org/licenses/>. 016 */ 017 018package org.photonvision.common.dataflow; 019 020import java.util.concurrent.BlockingQueue; 021import java.util.concurrent.CopyOnWriteArrayList; 022import java.util.concurrent.LinkedBlockingQueue; 023import java.util.stream.Collectors; 024import org.photonvision.common.dataflow.events.DataChangeEvent; 025import org.photonvision.common.logging.LogGroup; 026import org.photonvision.common.logging.Logger; 027 028@SuppressWarnings("rawtypes") 029public class DataChangeService { 030 private static final Logger logger = new Logger(DataChangeService.class, LogGroup.WebServer); 031 032 public static class SubscriberHandle { 033 private final int[] idxs; 034 035 private SubscriberHandle(int[] idxs) { 036 this.idxs = idxs; 037 } 038 039 private SubscriberHandle(int idx) { 040 this.idxs = new int[] {idx}; 041 } 042 043 public void stop() { 044 for (int idx : idxs) { 045 if (idx < 0) continue; 046 getInstance().subscribers.set(idx, null); 047 } 048 } 049 } 050 051 private static class ThreadSafeSingleton { 052 private static final DataChangeService INSTANCE = new DataChangeService(); 053 } 054 055 public static DataChangeService getInstance() { 056 return ThreadSafeSingleton.INSTANCE; 057 } 058 059 private final CopyOnWriteArrayList<DataChangeSubscriber> subscribers; 060 061 @SuppressWarnings("FieldCanBeLocal") 062 private final Thread dispatchThread; 063 064 private final BlockingQueue<DataChangeEvent> eventQueue = new LinkedBlockingQueue<>(); 065 066 private DataChangeService() { 067 subscribers = new CopyOnWriteArrayList<>(); 068 dispatchThread = new Thread(this::dispatchFromQueue); 069 dispatchThread.setName("DataChangeEventDispatchThread"); 070 dispatchThread.start(); 071 } 072 073 public boolean hasEvents() { 074 return !eventQueue.isEmpty(); 075 } 076 077 private void dispatchFromQueue() { 078 while (true) { 079 try { 080 var taken = eventQueue.take(); 081 for (var sub : subscribers) { 082 if (sub == null) continue; 083 if (sub.wantedSources.contains(taken.sourceType) 084 && sub.wantedDestinations.contains(taken.destType)) { 085 sub.onDataChangeEvent(taken); 086 } 087 } 088 } catch (Exception e) { 089 logger.error("Exception when dispatching event!", e); 090 e.printStackTrace(); 091 } 092 } 093 } 094 095 public SubscriberHandle addSubscriber(DataChangeSubscriber subscriber) { 096 if (!subscribers.addIfAbsent(subscriber)) { 097 logger.warn("Attempted to add already added subscriber!"); 098 return new SubscriberHandle(-1); 099 } else { 100 logger.debug( 101 () -> { 102 var sources = 103 subscriber.wantedSources.stream() 104 .map(Enum::toString) 105 .collect(Collectors.joining(", ")); 106 var dests = 107 subscriber.wantedDestinations.stream() 108 .map(Enum::toString) 109 .collect(Collectors.joining(", ")); 110 111 return "Added subscriber - " + "Sources: " + sources + ", Destinations: " + dests; 112 }); 113 return new SubscriberHandle(subscribers.size() - 1); 114 } 115 } 116 117 public SubscriberHandle addSubscribers(DataChangeSubscriber... subs) { 118 int[] idxs = new int[subs.length]; 119 for (int i = 0; i < subs.length; i++) { 120 idxs[i] = addSubscriber(subs[i]).idxs[0]; 121 } 122 return new SubscriberHandle(idxs); 123 } 124 125 public void publishEvent(DataChangeEvent event) { 126 eventQueue.offer(event); 127 } 128 129 public void publishEvents(DataChangeEvent... events) { 130 for (var event : events) { 131 publishEvent(event); 132 } 133 } 134}