Организация вебсокетного взаимодействия с приложением Spring

в 16:15, , рубрики: java

Скажу сразу, что стандартная реализация такого взаимодействия — существует.

Однако, поскольку эта статья — продолжение темы "Простой вызов удалённых сервисных методов в одностраничных приложениях", здесь будет приведена альтернативная схема взаимодействия, необходимая для замены ajax на вебсокеты, в контексте подхода(jrspc), описанного в вышеупомянутой теме.

В первой статье — был описан механизм вызова сервисных методов, с использованием ajax.

В этой статье — описано, как можно реализовать данный механизм, с заменой ajax на вебсокеты,
не меняя код бизнес-логики приложения.

Такая замена даёт более быстрое соединение(тесты в конце), экономию серверной памяти, и добавляет возможность вызывать методы клиента с сервера.

Для демонстрации, написано небольшое чат-приложение, с исходным кодом на гитхабе.
на примере разбора которого, я попытаюсь объяснить, как реализованы клиентская и серверная части такого взаимодействия.
Приложение работает на сервере tomcat 7.042.
Поддерживает https и wss (сертификат неподтверждённый), и не ведёт логов на сервере.

Серверная часть

Основным вопросом, возникшем при организации вебсокетного взаимодействия с приложением Spring, был вопрос — как обеспечить вызов компонентов Spring, из его областей видимостей, которые привязаны к http-сессии, из объекта StreamInbound, возвращаемого методом createWebSocketInbound класса WebSocketServlet, который к сессии не привязан?

Чтобы обеспечить требуемую функциональность по вызову методов серверных компонентов, нам нужно как-то получить доступ к рабочему ApplicationContext из наследника StreamInbound.

Если попытаемся заавтоваерить ApplicationContext, чтобы с его помощью получать нужные нам компоненты, в наследника WebSocketServlet или StreamInbound — нас ждёт разочарование, так как он не проинициализируется, что абсолютно законно.

Для того, чтобы из вебсокетного обработчика получить доступ к компонентам из контекстов Spring, которые связаны с http-сессией, нам нужно создать объект, который бы был сессионным спринговым бином, и который бы хранился в статическом объекте класса-хранилища, доступ к которому, имел бы наследник StreamInbound.

Этот сессионный объект (назовём его ClientManager), создаётся в процессе установки http соединения.

Соответственно, клиент, прежде чем начинать взаимодействовать с сервером через вебсокет, должен сделать один http handshake-запрос, в результате которого, он должен получить ид своего ClientManager.

Результат этого запроса можно передать в код клиента двумя способами — вставить clientManagerId в отдаваемую сгенерированную страницу, или через ajax запрос, со статической страницы (здесь — реализован вариант через ajax).

Обработка этого запроса производится в методе initializeClientManager сессионного контроллера:

@Controller
@Scope("session")
public class ClientManagerController {

    @Autowired   
    private ClientManager clientManager;                     
           
    @RequestMapping(value = "/init", method = RequestMethod.POST)
    @ResponseBody
    private String initializeClientManager(HttpSession session) {  
    
      JSONObject result = new JSONObject();       
        try{
            boolean loged = ClientManagersStorage.checkClientManager(clientManager, session) ;
            result.put("loged", loged);        
            result.put("clientManagerId", clientManager.getId());      
        }catch(Throwable th){
            result.put("error", th.toString()); 
        }         
        return result.toString();         
    }

ClientManagersStorage — это хранилище наших сессионных менеджеров клиентов, имеющее методы по проверке менеджера на null, созданию нового, добавлению в хранилище, поиску и удалению.

public class ClientManagersStorage {

    final static private  Map<String, ClientManager> clientManagers = new ConcurrentHashMap <String, ClientManager>();

    public static boolean checkClientManager(ClientManager clientManager, HttpSession session) {
        ClientManager registeredClientManager = clientManagers.get(clientManager.getId());
        if (registeredClientManager == null) {
            clientManager.setSession(session);
            addClientManager(clientManager);   
            registeredClientManager = clientManager;       
        }
        return registeredClientManager.getUser() != null;        
    }
    ...
}

(вопрос управления жизненным циклом сессии будет рассмотрен немного ниже)

Как видно, менеджеры хранятся в статической мапе, по ключу, являющемуся его hashCode, и когда пользователь перезагружает страницу — ему назначается тот же самый менеджер.

Ид этого менеджера — передаётся клиенту в переменной clientManagerId ответа.

После того, как клиент получил ид своего менеджера — он может открывать вебсокетное соединение, передавая свой clientManagerId в единственном параметре запроса на установку соединения.

Запрос на открытие этого соединения обрабатывается в методе createWebSocketInbound класса WebSocketConnectorServlet — имплементации абстрактного WebSocketServlet.

@Override
	protected StreamInbound createWebSocketInbound(String paramString, HttpServletRequest request) {
		String clientManagerId = request.getParameter("clientManagerId");		
		ClientManager clientManager = ClientManagersStorage.findClientManager(clientManagerId);
		if(clientManager == null){
		    return new WebSocketConnection(null);
		}		
		log.debug("new connection");
		return new WebSocketConnection(clientManager);
	}

в нём, из запроса достаётся clientManagerId, по нему находится ClientManager, и создаётся объект WebSocketConnection (являющийся StreamInbound), к которому привязан ClientManager.

Так как ClientManager — сессионный, и был создан в «нормальном» http запросе, то из него будут доступны все спринговые бины, через автоваеренный в него ApllicationContext, который, здесь, будет проинициализирован правильно).

При открытии нового соединения с клиентом, контейнером вызывается метод onOpen класса WebSocketConnection, в котором привязанный к нему ClientManager, добавляет этот WebSocketConnection, в свою мапу соединений, по ид(хэшкоду) объекта.

   @Override
    protected void onOpen(WsOutbound outbound) {
        if(clientManager != null){
           clientManager.addConnection(this);
        }    
    }

(Поддержка множества соединений необходима, чтобы пользователь мог открывать приложение в нескольких окнах, каждое из которых будет создавать своё вебсокетное соединение.)

Открыв соединение, клиент может слать запросы на вызовы серверных методов, которые будут обрабатываться в переопределённом методе onTextMessage класса WebSocketConnection.

    @Override
    protected void onTextMessage(CharBuffer message) throws IOException {
        try {                   
            String connectionId = String.valueOf(this.hashCode());
            String request = message.toString();
            clientManager.handleClientRequest(request, connectionId);            
        } catch (Throwable th) {
            log.error("in onTextMessage: " + th);
        }
    }

метод handleClientRequest класса ClientManager — обрабатывает запрос, и пишет результат в соединение:

@Autowired
    private RequestHandler requestHandler;
    
    public void handleClientRequest(String request, String connectionId) {
        log.debug("handleClientRequest request=" + request);
        log.debug("handleClientRequest user=" + getUser());   
        /** handleRequest - never throws exceptions ! */
        JSONObject response = requestHandler.handleRequest(request, this);        
        String responseJson = response.toString();
        CharBuffer buffer = CharBuffer.wrap(responseJson);
        WebSocketConnection connection = connections.get(connectionId);
        try {
            connection.getWsOutbound().writeTextMessage(buffer);
        } catch (IOException ioe) {
            log.error("in handleClientRequest: in writeTextMessage: " + ioe);
        }
    }

requestHandler — автоваеренный компонент, отвечающий за обработку запросов.
В него вварен ApllicationContext, при помощи которого он находит объекты сервисов.

Его метод handleRequest, ищет компонент сервиса, и вызывает на нём методы нужный клиенту, точно так же, как метод processAjaxRequest из класса CommonServiceController, из предыдущей статьи.

Такова общая схема взаимодействия.

Теперь рассмотрим подробнее момент инициализации ClientManager'а http сессией.

Сессия имеет свойство отваливаться по таймауту, который по умолчанию равен 30 минутам.
Чтобы избежать этого — выставляем его значение на максимум, и инвалидируем сессию когда нам это нужно — а именно, в двух случаях: первый случай — когда кто-то сделал запрос не из приложения, и второй, когда клиент закрыл страницу приложения.

Первый случай обрабатывается прямо в методе инициализации:

  public class ClientManager{
   
	    public void setSession(HttpSession session) {
	        /** session will be invalidated at connection removing */
	        session.setMaxInactiveInterval(Integer.MAX_VALUE);//69.04204112011317 years
	        this.session = session;
	        new Thread(new Runnable() {            
	            @Override
	            public void run() {
	                /** Giving time to client, for establish websocket connection. */
	                try {Thread.sleep(60000);} catch (InterruptedException ignored) {}
	                /** if client not connected via websocket until this time - it is bot */
	                if (connections.size() == 0) {removeMe();}                                
	            }            
	        }).start();        
	    }	    
	    private void removeMe() {ClientManagersStorage.removeClientManager(this);}
	    
    	...
  }  

а второй — в методе onClose класса WebSocketConnection:

public class WebSocketConnection{
	   @Override
	    protected void onClose(int status) {
	        if(clientManager != null){
	            clientManager.removeConnection(this);
	        }        
	    }
        ...
   }

   
   public class ClientManager{ 
   
	   public void removeConnection(WebSocketConnection webSocketConnection) {
	        String connectionId = getObjectHash(webSocketConnection);
	        connections.remove(connectionId);
	        if (connections.size() == 0) {
	            log.debug("removeConnection before wait:  connections.size()=" + connections.size());
	            /** may be client just reload page? */
	            try {Thread.sleep(waitForReloadTime);} catch (Throwable ignored) {}            
	            if (connections.size() == 0) {
	                /** no, client leave us (page closed in browser)*/      
	                ClientManagersStorage.removeClientManager(this); 
	                log.debug("client " + getId() + " disconnected");                    
	            }
	        }
	    }   
         ...
   }  
 

Сессия инвалидируется в методе removeClientManager класса ClientManagersStorage:

public static void removeClientManager(ClientManager clientManager) {        
        ClientManager removed  = clientManagers.remove(clientManager.getId());    
        if(removed == null){return;}
        User user = removed.getUser();
        if(user != null){                
            Broadcaster.broadcastCommand("userPanel.setLogedCount", UserService.logedCount.decrementAndGet());   
        }                
        Broadcaster.broadcastCommand("userPanel.setOnlineCount", ClientManagersStorage.getClientManagersCount());            
        try {
            clientManager.getSession().invalidate();
            clientManager.setSession(null);     
        } catch (Throwable th) {
            log.error("at removeClientManager: " + th);
        }        
    }  

Из этого же метода делается уведомление пользователей о том, что изменилось число посетителей страницы (обработка этих уведомлений на клиенте — описана ниже).

Для уведомления пользователей о событиях на сервере — используется класс Broadcaster, имеющий два метода: broadcastCommand и sendCommandToUser:

public class Broadcaster{
 
    public static void broadcastCommand(String method, Object params) {
        for (ClientManager clientManager : ClientManagersStorage.getClientManagers().values()) {
            clientManager.sendCommandToClient(method, params);
        }
    }    

    public static void sendCommandToUser(Long userId, String method, Object params) {     
        List<ClientManager> userClientManagers = ClientManagersStorage.findUserClientManagers(userId);
        for(ClientManager clientManager: userClientManagers){
            clientManager.sendCommandToClient(method, params);
        }        
    }
  }  

Метод sendCommandToClient класса СlientManager — работает так:

public void sendCommandToClient(String method, Object params) {
        for(WebSocketConnection connection: connections.values()){
            sendCommandToClientConnection(connection, method, params);             
        }        
    }    

    private void sendCommandToClientConnection(WebSocketConnection connection, String method, Object params) {
        JSONObject commandBody = new JSONObject();
        if(params == null){params = new JSONObject();}
        commandBody.put("method", method);
        commandBody.put("params", params);        
        CharBuffer buffer = CharBuffer.wrap(commandBody.toString());
        try {
            connection.getWsOutbound().writeTextMessage(buffer);                     
        } catch (IOException ioe) {
            log.error("in sendCommandToClient: in writeTextMessage: " + ioe);
        }                
    }

На этом, с серверной частью закончим, и перейдём к клиентской.

Клиентская часть

Клиентская часть должна реализовать три функциональности:

первая — handshake на ajax, для инициализации сессионного СlientManager'а, вторая — вебсокетный транспорт, для отправки запросов jsrpc и получения на них ответов, и третья — вызов функций на клиенте, с сервера.

Первая часть — самая простая:

Поскольку мы используем Ангуляр, для инициализирующего http-сессию запроса ajax, используется $http:

	var appName = "jrspc-ws"; 
	var secured = document.location.protocol == "https:" ? "s" : "";
	var HttpSessionInitializer = {url: "http"+secured+"://"+ document.location.host +"/"+appName+"/init"};
	
	/** called from root-controller.js after its initialization */	
	HttpSessionInitializer.init = function($http) {	    	    	
	    	$http.post(this.url, "").success(function(response){
					if (response.error) {
						error(response.error);
					} else {					
						loged = response.loged;					
						Server.initialize("ws"+secured+"://"+ document.location.host +"/"+appName+"/ws?clientManagerId="+response.clientManagerId);
						if(loged){Listeners.notify("onLogin");}					
					}    		
	    	}).error(function() {error("network error!");});    	    	    	
	}

На сервере, этот запрос обрабатывается в методе initializeClientManager класса ClientManagerController, код которого приведён выше, в описании серверной части.

Инициализация сокетного соединения происходит в функции Server.initialize:

  	connector.initialize = function(url) {
		connector.url = url;
		try {
			connector.connect(url);
			return true;
		} catch (ex) {
			p("in connector.initialize: " + ex);
			return false;
		}
	}

connector — внутренний объект Server, который отвечает за вебсокетное соединение (его полный код находится в файле ws-connector.js)

Код из ws-connector.js, который отвечает за формирование запроса jrspc:

  	Server.socketRequests = {};

	var requestId = 0;

	function sendSocket(service, method, params, successCallback, errorCallback, control) {
		if (!checkSocket()) {return;}
		requestId++;
		
		if(!params){params = [];}
		if(!isArray(params)){params = [params];}
		
		var data = {
			service : service,
			method : method,
			params : params,
			requestId : requestId
		};
		Server.socketRequests["request_" + requestId] = {
			successCallback : successCallback,
			errorCallback : errorCallback,
			control : control
		};

		if (control) {control.disabled = true;}
	
		var message = JSON.stringify(data);
		log("sendSocket: "+message);
		connector.socket.send(message);
	}
	...
	Server.call = sendSocket;

Код из ws-connector.js, который отвечает за обработку ответов на запросы, и обработку серверных команд:

 		connector.socket.onmessage = function(message) {
			var data = message.data;
			var response = JSON.parse(data);
			var requestId = response.requestId;
			if (requestId) {/** server return response */					
				var control = Server.socketRequests["request_" + requestId].control;
				if (control) {control.disabled = false;}									
				if (response.error) {
					var errorCallback = Server.socketRequests["request_" + requestId].errorCallback;
					if (errorCallback) {
						try {
							errorCallback(response.error);
						} catch (ex) {
							error("in connector.socket.onmessage errorCallback: " + ex + ", data=" + data);
						}
					}else{
						error(response.error);
					}
				} else {	
					var successCallback = Server.socketRequests["request_" + requestId].successCallback;
					if (successCallback) {
						try {
							successCallback(response.result);
						} catch (ex) {
							error("in connector.socket.onmessage successCallback: " + ex + ", data=" + data);
						}
					}
				}
				delete Server.socketRequests["request_" + requestId];
			} else {
				/** server call client or broadcast */
				var method = eval(response.method);
				var params = response.params;
				try {
					method(params);
				} catch (ex) {
					error("in connector.socket.onmessage call method: " + ex + ", data=" + data);
				}
			}
		}; 

Применение вышеописанного фреймворка, позволяет реализовать всю бизнес логику, отвечающую за функциональность чата — в двух функциях на клиенте (chat-controller.js):

		self.sendMessage = function(command){
	    	var message = {to: (self.sendPrivate ? self.privateTo : "all"), from: userPanel.user.login, text: self.newMessage, clientTime: new Date().getTime()};
	     	Server.call("chatService", "dispatchMessage", message,
	    	function(){	self.newMessage = ""; self.$digest(); }, function(error){self.onError(error);}, command);
	    } 
	               
	    /** called from server */
	    self.onChatMessage = function (message){ 	
	    	message.isPrivate = (message.to != "all");
	    	self.messages.push(message);
	    	self.$digest();	
	    	chatConsole.scrollTop = chatConsole.clientHeight + chatConsole.scrollHeight;    	
	    } 

и одном серверном методе:

		@Component
	    public class ChatService extends AbstractService{
	       
		    @Autowired
		    private UserManager userManager;
		    
		    @Secured("User")
		    @Remote
		    public void dispatchMessage(ChatMessage message){ 
		        message.setServerTime(new Date().getTime());  
		        String to = message.getTo();
		        if("ALL".equalsIgnoreCase(to)){                   
		            Broadcaster.broadcastCommand("chatPanel.onChatMessage", message);
		        }else{            
		            User fromUser = getUser();
		            message.setFrom(fromUser.getLogin());
		            User toUser = userManager.findByLogin(to);    
		            if(toUser == null){throw new RuntimeException("User "+to+" not found!");}             
		            Broadcaster.sendCommandToUser(toUser.getId(), "chatPanel.onChatMessage", message);        
		            Broadcaster.sendCommandToUser(fromUser.getId(), "chatPanel.onChatMessage", message);     
		        }                
		    }                       
	    }

Тесты на скорость, при последовательной и параллельной отправке 1000 запросов, для ajax и websockets:

последовательно: ajax (3474, 3380, 3377) ws (1299, 1113, 1054)
параллельно: ajax (1502, 1515, 1469) ws (616, 637, 632)

код тестов

		function testController($scope){	
			var self = $scope;	
		   
		    self.maxIterations = 1000;
		    self.testIterations = self.maxIterations;
		    self.testStart = 0;
		    self.testEnd = 0;
		        
		    self.testForSpeedSerial = function(command){
		    	if(self.testStart == 0){self.testStart = now();}
		    	if(--self.testIterations <= 0){
		    		var duration = now() - self.testStart;
		    		alert("testForSpeedSerial duration="+duration);    
		    		self.testStart = 0;
		    		self.testIterations = self.maxIterations;
		    		return;
		    	}
		    	Server.call("userService", "testForSpeed", "", function(){ self.testForSpeedSerial(command); }, error, command);    	
		    }
		    
		    self.testForSpeedParallelResponses = 0;
		    
		    self.testForSpeedParallel = function(command){	
		    	self.testStart = now();    	
		    	for(var i = 0; i < self.testIterations; i++){
		    		Server.call("userService", "testForSpeed", "", 
		    				function(){
		    			       self.testForSpeedParallelResponses++ ; 
		    			       if(self.testForSpeedParallelResponses >= self.maxIterations){
		    			    	      	var duration = now() - self.testStart;
								    	alert("testForSpeedParallel duration="+duration);    		
								    	self.testForSpeedParallelResponses = 0;
		    			       }
		    				}, error, command); 
		    	}    	
		    } 
		}
   

серверный метод testForSpeed:

     @Remote public void testForSpeed(){} 

Все критические замечания и указания на ошибки буду приняты с благодарностью.

Автор: Metalfire

Источник

Поделиться

* - обязательные к заполнению поля