001/* 002 * Copyright (C) Photon Vision. 003 * 004 * This program is free software: you can redistribute it and/or modify 005 * it under the terms of the GNU General Public License as published by 006 * the Free Software Foundation, either version 3 of the License, or 007 * (at your option) any later version. 008 * 009 * This program is distributed in the hope that it will be useful, 010 * but WITHOUT ANY WARRANTY; without even the implied warranty of 011 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 012 * GNU General Public License for more details. 013 * 014 * You should have received a copy of the GNU General Public License 015 * along with this program. If not, see <https://www.gnu.org/licenses/>. 016 */ 017 018package org.photonvision.server; 019 020import com.fasterxml.jackson.core.JsonProcessingException; 021import com.fasterxml.jackson.core.type.TypeReference; 022import com.fasterxml.jackson.databind.ObjectMapper; 023import io.javalin.websocket.WsBinaryMessageContext; 024import io.javalin.websocket.WsCloseContext; 025import io.javalin.websocket.WsConnectContext; 026import io.javalin.websocket.WsContext; 027import java.io.IOException; 028import java.net.InetSocketAddress; 029import java.nio.ByteBuffer; 030import java.time.Duration; 031import java.util.ArrayList; 032import java.util.HashMap; 033import java.util.List; 034import java.util.Map; 035import java.util.concurrent.CopyOnWriteArrayList; 036import org.apache.commons.lang3.tuple.Pair; 037import org.msgpack.jackson.dataformat.MessagePackFactory; 038import org.photonvision.common.dataflow.DataChangeDestination; 039import org.photonvision.common.dataflow.DataChangeService; 040import org.photonvision.common.dataflow.events.IncomingWebSocketEvent; 041import org.photonvision.common.hardware.HardwareManager; 042import org.photonvision.common.logging.LogGroup; 043import org.photonvision.common.logging.Logger; 044import org.photonvision.vision.pipeline.PipelineType; 045 046@SuppressWarnings("rawtypes") 047public class DataSocketHandler { 048 private final Logger logger = new Logger(DataSocketHandler.class, LogGroup.WebServer); 049 private final List<WsContext> users = new CopyOnWriteArrayList<>(); 050 private final ObjectMapper objectMapper = new ObjectMapper(new MessagePackFactory()); 051 private final DataChangeService dcService = DataChangeService.getInstance(); 052 053 @SuppressWarnings("FieldCanBeLocal") 054 private final UIOutboundSubscriber uiOutboundSubscriber = new UIOutboundSubscriber(this); 055 056 private static class ThreadSafeSingleton { 057 private static final DataSocketHandler INSTANCE = new DataSocketHandler(); 058 } 059 060 public static DataSocketHandler getInstance() { 061 return DataSocketHandler.ThreadSafeSingleton.INSTANCE; 062 } 063 064 private DataSocketHandler() { 065 dcService.addSubscribers( 066 uiOutboundSubscriber, 067 new UIInboundSubscriber()); // Subscribe outgoing messages to the data change service 068 } 069 070 public void onConnect(WsConnectContext context) { 071 users.add(context); 072 context.session.setIdleTimeout( 073 Duration.ofMillis(Long.MAX_VALUE)); // TODO: determine better value 074 var remote = (InetSocketAddress) context.session.getRemoteAddress(); 075 var host = remote.getAddress().toString() + ":" + remote.getPort(); 076 logger.info("New websocket connection from " + host); 077 dcService.publishEvent( 078 new IncomingWebSocketEvent<>( 079 DataChangeDestination.DCD_GENSETTINGS, "userConnected", context)); 080 } 081 082 protected void onClose(WsCloseContext context) { 083 users.remove(context); 084 var remote = (InetSocketAddress) context.session.getRemoteAddress(); 085 // Remote can be null if server is being closed for restart 086 if (remote != null) { 087 var host = remote.getAddress().toString() + ":" + remote.getPort(); 088 var reason = context.reason() != null ? context.reason() : "Connection closed by client"; 089 logger.info("Closing websocket connection from " + host + " for reason: " + reason); 090 } else { 091 logger.info("Closing websockets for user " + context.getSessionId()); 092 } 093 } 094 095 @SuppressWarnings({"unchecked"}) 096 public void onBinaryMessage(WsBinaryMessageContext context) { 097 try { 098 Map<String, Object> deserializedData = 099 objectMapper.readValue(context.data(), new TypeReference<>() {}); 100 101 // Special case the current camera index 102 String cameraUniqueName = ""; 103 if (deserializedData.get("cameraUniqueName") instanceof String camUniqueNameValue) { 104 cameraUniqueName = camUniqueNameValue; 105 deserializedData.remove("cameraUniqueName"); 106 } 107 108 for (Map.Entry<String, Object> entry : deserializedData.entrySet()) { 109 try { 110 var entryKey = entry.getKey(); 111 var entryValue = entry.getValue(); 112 var socketMessageType = DataSocketMessageType.fromEntryKey(entryKey); 113 114 logger.trace( 115 () -> 116 "Got WS message: [" 117 + socketMessageType 118 + "] ==> [" 119 + entryKey 120 + "], [" 121 + entryValue 122 + "]"); 123 124 if (socketMessageType == null) { 125 logger.warn("Got unknown socket message type: " + entryKey); 126 continue; 127 } 128 129 switch (socketMessageType) { 130 case SMT_DRIVERMODE -> { 131 // TODO: what is this event? 132 var data = (Boolean) entryValue; 133 var dmIsDriverEvent = 134 new IncomingWebSocketEvent<Boolean>( 135 DataChangeDestination.DCD_ACTIVEMODULE, 136 "isDriverMode", 137 data, 138 cameraUniqueName, 139 context); 140 141 dcService.publishEvents(dmIsDriverEvent); 142 } 143 case SMT_CHANGECAMERANAME -> { 144 var ccnEvent = 145 new IncomingWebSocketEvent<>( 146 DataChangeDestination.DCD_ACTIVEMODULE, 147 "cameraNickname", 148 (String) entryValue, 149 cameraUniqueName, 150 context); 151 dcService.publishEvent(ccnEvent); 152 } 153 case SMT_CHANGEPIPELINENAME -> { 154 var cpnEvent = 155 new IncomingWebSocketEvent<>( 156 DataChangeDestination.DCD_ACTIVEMODULE, 157 "pipelineName", 158 (String) entryValue, 159 cameraUniqueName, 160 context); 161 dcService.publishEvent(cpnEvent); 162 } 163 case SMT_ADDNEWPIPELINE -> { 164 // HashMap<String, Object> data = (HashMap<String, Object>) entryValue; 165 // var type = (PipelineType) data.get("pipelineType"); 166 // var name = (String) data.get("pipelineName"); 167 var arr = (ArrayList<Object>) entryValue; 168 var name = (String) arr.get(0); 169 var type = PipelineType.values()[(Integer) arr.get(1) + 2]; 170 171 var newPipelineEvent = 172 new IncomingWebSocketEvent<>( 173 DataChangeDestination.DCD_ACTIVEMODULE, 174 "newPipelineInfo", 175 Pair.of(name, type), 176 cameraUniqueName, 177 context); 178 dcService.publishEvent(newPipelineEvent); 179 } 180 case SMT_CHANGEBRIGHTNESS -> { 181 HardwareManager.getInstance() 182 .setBrightnessPercent(Integer.parseInt(entryValue.toString())); 183 } 184 case SMT_DUPLICATEPIPELINE -> { 185 var pipeIndex = (Integer) entryValue; 186 187 logger.info("Duplicating pipe@index" + pipeIndex + " for camera " + cameraUniqueName); 188 189 var newPipelineEvent = 190 new IncomingWebSocketEvent<>( 191 DataChangeDestination.DCD_ACTIVEMODULE, 192 "duplicatePipeline", 193 pipeIndex, 194 cameraUniqueName, 195 context); 196 dcService.publishEvent(newPipelineEvent); 197 } 198 case SMT_DELETECURRENTPIPELINE -> { 199 var deleteCurrentPipelineEvent = 200 new IncomingWebSocketEvent<>( 201 DataChangeDestination.DCD_ACTIVEMODULE, 202 "deleteCurrPipeline", 203 0, 204 cameraUniqueName, 205 context); 206 dcService.publishEvent(deleteCurrentPipelineEvent); 207 } 208 case SMT_ROBOTOFFSETPOINT -> { 209 var robotOffsetPointEvent = 210 new IncomingWebSocketEvent<>( 211 DataChangeDestination.DCD_ACTIVEMODULE, 212 "robotOffsetPoint", 213 (Integer) entryValue, 214 cameraUniqueName, 215 null); 216 dcService.publishEvent(robotOffsetPointEvent); 217 } 218 case SMT_CURRENTCAMERA -> { 219 var changeCurrentCameraEvent = 220 new IncomingWebSocketEvent<>( 221 DataChangeDestination.DCD_OTHER, "changeUICamera", (Integer) entryValue); 222 dcService.publishEvent(changeCurrentCameraEvent); 223 } 224 case SMT_CURRENTPIPELINE -> { 225 var changePipelineEvent = 226 new IncomingWebSocketEvent<>( 227 DataChangeDestination.DCD_ACTIVEMODULE, 228 "changePipeline", 229 (Integer) entryValue, 230 cameraUniqueName, 231 context); 232 dcService.publishEvent(changePipelineEvent); 233 } 234 case SMT_STARTPNPCALIBRATION -> { 235 var changePipelineEvent = 236 new IncomingWebSocketEvent<>( 237 DataChangeDestination.DCD_ACTIVEMODULE, 238 "startCalibration", 239 (Map) entryValue, 240 cameraUniqueName, 241 context); 242 dcService.publishEvent(changePipelineEvent); 243 } 244 case SMT_SAVEINPUTSNAPSHOT -> { 245 var takeInputSnapshotEvent = 246 new IncomingWebSocketEvent<>( 247 DataChangeDestination.DCD_ACTIVEMODULE, 248 "saveInputSnapshot", 249 0, 250 cameraUniqueName, 251 context); 252 dcService.publishEvent(takeInputSnapshotEvent); 253 } 254 case SMT_SAVEOUTPUTSNAPSHOT -> { 255 var takeOutputSnapshotEvent = 256 new IncomingWebSocketEvent<>( 257 DataChangeDestination.DCD_ACTIVEMODULE, 258 "saveOutputSnapshot", 259 0, 260 cameraUniqueName, 261 context); 262 dcService.publishEvent(takeOutputSnapshotEvent); 263 } 264 case SMT_TAKECALIBRATIONSNAPSHOT -> { 265 var takeCalSnapshotEvent = 266 new IncomingWebSocketEvent<>( 267 DataChangeDestination.DCD_ACTIVEMODULE, 268 "takeCalSnapshot", 269 0, 270 cameraUniqueName, 271 context); 272 dcService.publishEvent(takeCalSnapshotEvent); 273 } 274 case SMT_PIPELINESETTINGCHANGE -> { 275 HashMap<String, Object> data = (HashMap<String, Object>) entryValue; 276 277 if (data.size() >= 2) { 278 var cameraIndex2 = (String) data.get("cameraUniqueName"); 279 for (var dataEntry : data.entrySet()) { 280 if (dataEntry.getKey().equals("cameraUniqueName")) { 281 continue; 282 } 283 var pipelineSettingChangeEvent = 284 new IncomingWebSocketEvent( 285 DataChangeDestination.DCD_ACTIVEPIPELINESETTINGS, 286 dataEntry.getKey(), 287 dataEntry.getValue(), 288 cameraIndex2, 289 context); 290 dcService.publishEvent(pipelineSettingChangeEvent); 291 } 292 } else { 293 logger.warn("Unknown message for PSC: " + data.keySet().iterator().next()); 294 } 295 } 296 case SMT_CHANGEPIPELINETYPE -> { 297 var changePipelineEvent = 298 new IncomingWebSocketEvent<>( 299 DataChangeDestination.DCD_ACTIVEMODULE, 300 "changePipelineType", 301 (Integer) entryValue, 302 cameraUniqueName, 303 context); 304 dcService.publishEvent(changePipelineEvent); 305 } 306 } 307 } catch (Exception e) { 308 logger.error("Failed to parse message!", e); 309 } 310 } 311 } catch (IOException e) { 312 logger.error("Failed to deserialize message!", e); 313 } 314 } 315 316 private void sendMessage(ByteBuffer b, WsContext user) throws JsonProcessingException { 317 if (user.session.isOpen()) { 318 user.send(b); 319 } 320 } 321 322 public void broadcastMessage(Object message, WsContext userToSkip) 323 throws JsonProcessingException { 324 ByteBuffer b = ByteBuffer.wrap(objectMapper.writeValueAsBytes(message)); 325 326 if (userToSkip == null) { 327 for (WsContext user : users) { 328 sendMessage(b, user); 329 } 330 } else { 331 var skipUserPort = ((InetSocketAddress) userToSkip.session.getRemoteAddress()).getPort(); 332 for (WsContext user : users) { 333 var userPort = ((InetSocketAddress) user.session.getRemoteAddress()).getPort(); 334 if (userPort != skipUserPort) { 335 sendMessage(b, user); 336 } 337 } 338 } 339 } 340}