admin 管理员组文章数量: 1086019
2024年2月24日发(作者:图片设计模板免费下载)
(CLASS_NAME,methodName,"101",new Object[]{clientId,serverURI,persistence}); (clientId, serverURI); // 【: 2】 = new ClientComms(this, tence, pingSender); // 【: 3】 (); = new Hashtable(); }
protected NetworkModule[] createNetworkModules(String address, MqttConnectOptions options) throws MqttException, MqttSecurityException { // 【: 4】 final String methodName = "createNetworkModules"; // @TRACE 116=URI={0} (CLASS_NAME, methodName, "116", new Object[]{address}); NetworkModule[] networkModules = null; String[] serverURIs = verURIs(); String[] array = null; if (serverURIs == null) { array = new String[]{address}; } else if ( == 0) { array = new String[]{address}; } else { array = serverURIs; } networkModules = new NetworkModule[]; for (int i = 0; i < ; i++) { networkModules[i] = createNetworkModule(array[i], options); } (CLASS_NAME, methodName, "108"); return networkModules; } private NetworkModule createNetworkModule(String address, MqttConnectOptions options) throws MqttException, MqttSecurityException {
final String methodName = "createNetworkModule"; // @TRACE 115=URI={0} (CLASS_NAME,methodName, "115", new Object[] {address}); NetworkModule netModule; String shortAddress; String host; int port; SocketFactory factory = ketFactory(); int serverURIType = teURI(address); // 【: 5】 switch (serverURIType) { case _TYPE_TCP : shortAddress = ing(6); host = getHostName(shortAddress); port = getPort(shortAddress, 1883); if (factory == null) { factory = ault(); } else if (factory instanceof SSLSocketFactory) { throw MqttException(_CODE_SOCKET_FACTORY_MISMATCH); } netModule = new TCPNetworkModule(factory, host, port, clientId); // 【: 7】 ((TCPNetworkModule)netModule).setConnectTimeout(nectionTimeout()); break; case _TYPE_SSL: shortAddress = ing(6); host = getHostName(shortAddress); port = getPort(shortAddress, 8883); SSLSocketFactoryFactory factoryFactory = null; if (factory == null) {// try { factoryFactory = new SSLSocketFactoryFactory(); Properties sslClientProps = Properties(); if (null != sslClientProps) lize(sslClientProps, null); factory = SocketFactory(null);// }// catch (MqttDirectException ex) {// throw MqttException(se());// } } else if ((factory instanceof SSLSocketFactory) == false) { throw MqttException(_CODE_SOCKET_FACTORY_MISMATCH); } // Create the netModule = new SSLNetworkModule((SSLSocketFactory) factory, host, port, clientId); ((SSLNetworkModule)netModule).setSSLhandshakeTimeout(nectionTimeout()); // Ciphers suites need to be set, if they are available if (factoryFactory != null) { String[] enabledCiphers = bledCipherSuites(null); if (enabledCiphers != null) { ((SSLNetworkModule) netModule).setEnabledCiphers(enabledCiphers); } } break; case _TYPE_WS: shortAddress = ing(5); host = getHostName(shortAddress); port = getPort(shortAddress, 80); if (factory == null) {
factory = ault(); } else if (factory instanceof SSLSocketFactory) { throw MqttException(_CODE_SOCKET_FACTORY_MISMATCH); } netModule = new WebSocketNetworkModule(factory, address, host, port, clientId); ((WebSocketNetworkModule)netModule).setConnectTimeout(nectionTimeout()); break; case _TYPE_WSS: shortAddress = ing(6); host = getHostName(shortAddress); port = getPort(shortAddress, 443); SSLSocketFactoryFactory wSSFactoryFactory = null; if (factory == null) { wSSFactoryFactory = new SSLSocketFactoryFactory(); Properties sslClientProps = Properties(); if (null != sslClientProps) lize(sslClientProps, null); factory = SocketFactory(null); } else if ((factory instanceof SSLSocketFactory) == false) { throw MqttException(_CODE_SOCKET_FACTORY_MISMATCH); } // Create the
netModule = new WebSocketSecureNetworkModule((SSLSocketFactory) factory, address, host, port, clientId); ((WebSocketSecureNetworkModule)netModule).setSSLhandshakeTimeout(nectionTimeout()); // Ciphers suites need to be set, if they are available if (wSSFactoryFactory != null) { String[] enabledCiphers = bledCipherSuites(null); if (enabledCiphers != null) { ((SSLNetworkModule) netModule).setEnabledCiphers(enabledCiphers); } } break; case _TYPE_LOCAL : netModule = new LocalNetworkModule(ing(8)); break; default: // This shouldn't happen, as long as validateURI() has been called. netModule = null; } return netModule; } .......
public IMqttToken connect(MqttConnectOptions options, Object userContext, IMqttActionListener callback) throws MqttException, MqttSecurityException { final String methodName = "connect"; if (ected()) { throw MqttException(_CODE_CLIENT_CONNECTED); } if (ecting()) { throw new MqttException(_CODE_CONNECT_IN_PROGRESS); } if (onnecting()) { throw new MqttException(_CODE_CLIENT_DISCONNECTING); } if (ed()) { throw new MqttException(_CODE_CLIENT_CLOSED); } ts = options; ntext = userContext; final boolean automaticReconnect = maticReconnect(); // @TRACE 103=cleanSession={0} connectionTimeout={1} TimekeepAlive={2} userName={3} password={4} will={5} userContext={6} callback={7} (CLASS_NAME,methodName, "103", new Object[]{ f(nSession()), new Integer(nectionTimeout()), new Integer(pAliveInterval()), rName(), ((null == sword())?"[null]":"[notnull]"), ((null == lMessage())?"[null]":"[notnull]"), userContext, callback }); workModules(createNetworkModules(serverURI, options)); onnectCallback(new MqttCallbackExtended() {
public void messageArrived(String topic, MqttMessage message) throws Exception { } public void deliveryComplete(IMqttDeliveryToken token) { } public void connectComplete(boolean reconnect, String serverURI) { } public void connectionLost(Throwable cause) { if(automaticReconnect){ // Automatic reconnect is set so make sure comms is in resting state tingState(true); reconnecting = true; startReconnectCycle(); } }
}); // Insert our own callback to iterate through the URIs till the connect succeeds MqttToken userToken = new MqttToken(getClientId()); // 【: 8】.... ConnectActionListener connectActionListener = new ConnectActionListener(this, persistence, comms, options, userToken, userContext, callback, reconnecting); ionCallback(connectActionListener); rContext(this); // If we are using the MqttCallbackExtended, set it on the connectActionListener if(llback instanceof MqttCallbackExtended){ tCallbackExtended((MqttCallbackExtended)llback); } workModuleIndex(0); t(); return userToken; } ........ public IMqttToken disconnect(long quiesceTimeout, Object userContext, IMqttActionListener callback) throws MqttException { final String methodName = "disconnect"; MqttToken token = new MqttToken(getClientId()); ionCallback(callback); rContext(userContext); MqttDisconnect disconnect = new MqttDisconnect(); try { nect(disconnect, quiesceTimeout, token); } catch (MqttException ex) { throw ex; } return token; }
public String getCurrentServerURI(){ return workModules()[workModuleIndex()].getServerURI(); } protected MqttTopic getTopic(String topic) { te(topic, false/*wildcards NOT allowed*/); MqttTopic result = (MqttTopic)(topic); if (result == null) { result = new MqttTopic(topic, comms); (topic,result); } return result; }
public IMqttToken checkPing(Object userContext, IMqttActionListener callback) throws MqttException{ // 【: 9】 final String methodName = "ping"; MqttToken token;
token = orActivity();
return token; }
.........
public IMqttToken subscribe(String[] topicFilters, int[] qos, Object userContext, IMqttActionListener callback) throws MqttException { final String methodName = "subscribe"; if ( != ) { throw new IllegalArgumentException(); }
// remove any message handlers for individual topics
for (int i = 0; i < ; ++i) { MessageListener(topicFilters[i]); }
String subs = ""; for (int i=0;i<;i++) { if (i>0) { subs+=", "; } subs+= "topic="+ topicFilters[i]+" qos="+qos[i];
//Check if the topic filter is valid before subscribing te(topicFilters[i], true/*allow wildcards*/); } //@TRACE 106=Subscribe topicFilter={0} userContext={1} callback={2} (CLASS_NAME,methodName,"106",new Object[]{subs, userContext, callback}); MqttToken token = new MqttToken(getClientId()); ionCallback(callback); rContext(userContext); ics(topicFilters); MqttSubscribe register = new MqttSubscribe(topicFilters, qos); Wait(register, token); //@TRACE 109=< (CLASS_NAME,methodName,"109");
return token; }
/* (non-Javadoc) * @see syncClient#unsubscribe(, , ctionListener) */ public IMqttToken unsubscribe(String topicFilter, Object userContext, IMqttActionListener callback) throws MqttException { return unsubscribe(new String[] {topicFilter}, userContext, callback); } public IMqttToken unsubscribe(String[] topicFilters, Object userContext, IMqttActionListener callback) throws MqttException { final String methodName = "unsubscribe"; String subs = ""; for (int i=0;i<;i++) { if (i>0) { subs+=", "; } subs+=topicFilters[i];
// Check if the topic filter is valid before unsubscribing // Although we already checked when subscribing, but invalid // topic filter is meanless for unsubscribing, just prohibit it // to reduce unnecessary control packet send to broker. te(topicFilters[i], true/*allow wildcards*/); }
// remove message handlers from the list for this client for (int i = 0; i < ; ++i) { MessageListener(topicFilters[i]); } MqttToken token = new MqttToken(getClientId()); ionCallback(callback); rContext(userContext); ics(topicFilters); MqttUnsubscribe unregister = new MqttUnsubscribe(topicFilters); Wait(unregister, token); return token; } public void setCallback(MqttCallback callback) { llback = callback; lback(callback); }
public void setManualAcks(boolean manualAcks) { ualAcks(manualAcks); }
public void messageArrivedComplete(int messageId, int qos) throws MqttException { eArrivedComplete(messageId, qos); } public static String generateClientId() { // 【: 10】 //length of nanoTime = 15, so total length = 19 < 65535(defined in spec)
return CLIENT_ID_PREFIX + me(); } public IMqttDeliveryToken publish(String topic, MqttMessage message, Object userContext, IMqttActionListener callback) throws MqttException, MqttPersistenceException { final String methodName = "publish"; //Checks if a topic is valid when publishing a message. te(topic, false/*wildcards NOT allowed*/); MqttDeliveryToken token = new MqttDeliveryToken(getClientId()); ionCallback(callback); rContext(userContext); sage(message); ics(new String[] {topic}); MqttPublish pubMsg = new MqttPublish(topic, message); Wait(pubMsg, token); //@TRACE 112=< (CLASS_NAME,methodName,"112"); return token; } public void reconnect() throws MqttException { final String methodName = "reconnect"; //@Trace 500=Attempting to reconnect client: {0} (CLASS_NAME, methodName, "500", new Object[]{Id}); // Some checks to make sure that we're not attempting to reconnect an already connected client if (ected()) { throw MqttException(_CODE_CLIENT_CONNECTED); } if (ecting()) { throw new MqttException(_CODE_CONNECT_IN_PROGRESS); } if (onnecting()) { throw new MqttException(_CODE_CLIENT_DISCONNECTING); }
} /** * Sends a message to the server. Does not check if connected this validation must be done * by invoking routines. * @param message * @param token * @throws MqttException */ void internalSend(MqttWireMessage message, MqttToken token) throws MqttException { if (ent() == null ) { // Associate the client with the token - also marks it as in use. ent(getClient()); } else { // Token is already in use - cannot reuse //@TRACE 213=fail: token in use: key={0} message={1} token={2} (CLASS_NAME, methodName, "213", new Object[]{(), message, token}); throw new MqttException(_CODE_TOKEN_INUSE); } try { // Persist if needed and send the message (message, token); // 【: 14】 } catch(MqttException e) { if (message instanceof MqttPublish) { ((MqttPublish)message); } throw e; } } /** * Sends a message to the broker if in connected state, but only waits for the message to be * stored, before returning. */ public void sendNoWait(MqttWireMessage message, MqttToken token) throws MqttException { // 【: 15】 final String methodName = "sendNoWait"; if (isConnected() || (!isConnected() && message instanceof MqttConnect) || (isDisconnecting() && message instanceof MqttDisconnect)) { if(disconnectedMessageBuffer != null && sageCount() != 0){ tBufferedMessage(message); sage(message, token); } else { alSend(message, token); } } else if(disconnectedMessageBuffer != null && isResting()){ tBufferedMessage(message); sage(message, token); } else { throw MqttException(_CODE_CLIENT_NOT_CONNECTED); } } /** * Sends a connect message and waits for an ACK or NACK. * Connecting is a special case which will also start up the * network connection, receive thread, and keep alive thread. */ public void connect(MqttConnectOptions options, MqttToken token) throws MqttException { final String methodName = "connect"; synchronized (conLock) { if (isDisconnected() && !closePending) { conState = CONNECTING; conOptions = options; MqttConnect connect = new MqttConnect(entId(), tVersion(), nSession(), pAliveInterval(), rName(), sword(), lMessage(), lDestination()); pAliveSecs(pAliveInterval()); anSession(nSession()); Inflight(Inflight()); (); ConnectBG conbg = new ConnectBG(this, token, connect); (); } else { if (isClosed() || closePending) { throw new MqttException(_CODE_CLIENT_CLOSED); } else if (isConnecting()) { throw new MqttException(_CODE_CONNECT_IN_PROGRESS); } else if (isDisconnecting()) { throw new MqttException(_CODE_CLIENT_DISCONNECTING); } else { throw MqttException(_CODE_CLIENT_CONNECTED); } } }
} public void connectComplete( MqttConnack cack, MqttException mex) throws MqttException { final String methodName = "connectComplete"; int rc = urnCode(); synchronized (conLock) { if (rc == 0) { conState = CONNECTED; return; } } throw mex; } // Tidy up. There may be tokens outstanding as the client was // not disconnected/quiseced cleanly! Work out what tokens still // need to be notified and waiters unblocked. Store the // disconnect or connect token to notify after disconnect is // complete. private MqttToken handleOldTokens(MqttToken token, MqttException reason) { // 【: 16】 MqttToken tokToNotifyLater = null; try { // First the token that was related to the disconnect / shutdown may // not be in the token table - temporarily add it if not if (token != null) { if (en(())==null) { ken(token, ()); } } Vector toksToNot = eOldTokens(reason); Enumeration toksToNotE = ts(); while(eElements()) { MqttToken tok = (MqttToken)ement(); if (().equals() || ().equals()) { // Its con or discon so remember and notify @ end of disc routine tokToNotifyLater = tok; } else { // notify waiters and callbacks of outstanding tokens // that a problem has occurred and disconnect is in // progress perationComplete(tok); } } }catch(Exception ex) { // Ignore as we are shutting down } return tokToNotifyLater; }
public void messageArrivedComplete(int messageId, int qos) throws MqttException { eArrivedComplete(messageId, qos); } // Kick off the connect processing in the background so that it does not block. For instance // the socket could take time to create. private class ConnectBG implements Runnable { ClientComms clientComms = null; Thread cBg = null; MqttToken conToken; MqttConnect conPacket; ConnectBG(ClientComms cc, MqttToken cToken, MqttConnect cPacket) { clientComms = cc; conToken = cToken; conPacket = cPacket; cBg = new Thread(this, "MQTT Con: "+getClient().getClientId()); } void start() { (); } public void run() { // 【: 17】 final String methodName = "connectBG:run"; MqttException mqttEx = null; //@TRACE 220=> (CLASS_NAME, methodName, "220"); try { // Reset an exception on existing delivery tokens. // This will have been set if disconnect occured before delivery was // fully processed. MqttDeliveryToken[] toks = standingDelTokens(); for (int i=0; i<; i++) { toks[i].eption(null); } // Save the connect token in tokenStore as failure can occur before send ken(conToken,conPacket); // Connect to the server at the network TCP socket and then // start the background processing threads before sending the connect
public class ClientState { private static final int MIN_MSG_ID = 1; // Lowest possible MQTT message ID to use private static final int MAX_MSG_ID = 65535; // Highest possible MQTT message ID to use private int nextMsgId = MIN_MSG_ID - 1; // The next available message ID to use private Hashtable inUseMsgIds; // Used to store a set of in-use message IDs volatile private Vector pendingMessages; volatile private Vector pendingFlows;
private CommsTokenStore tokenStore; private ClientComms clientComms = null; private CommsCallback callback = null; private long keepAlive; private boolean cleanSession; private MqttClientPersistence persistence;
private int maxInflight = 0;
private int actualInFlight = 0; private int inFlightPubRels = 0;
private Object queueLock = new Object(); private Object quiesceLock = new Object(); private boolean quiescing = false;
private long lastOutboundActivity = 0; private long lastInboundActivity = 0; private long lastPing = 0; private MqttWireMessage pingCommand; private Object pingOutstandingLock = new Object(); private int pingOutstanding = 0; private boolean connected = false;
private Hashtable outboundQoS2 = null; private Hashtable outboundQoS1 = null; private Hashtable outboundQoS0 = null; private Hashtable inboundQoS2 = null;
private MqttPingSender pingSender = null; protected ClientState(MqttClientPersistence persistence, CommsTokenStore tokenStore,
CommsCallback callback, ClientComms clientComms, MqttPingSender pingSender) throws MqttException {
ourceName(ent().getClientId()); (CLASS_NAME, "
tence = persistence; ck = callback; tore = tokenStore; Comms = clientComms; nder = pingSender;
restoreState(); }先有个类的概念我拆出来看。
/** * Submits a message for delivery. This method will block until there is * room in the inFlightWindow for the message. The message is put into * persistence before returning. *
* @param message the message to send * @param token the token that can be used to track delivery of the message * @throws MqttException */ public void send(MqttWireMessage message, MqttToken token) throws MqttException { final String methodName = "send"; if (ageIdRequired() && (sageId() == 0)) { sageId(getNextMessageId()); } if (token != null ) { try { sageID(sageId()); } catch (Exception e) { } }
if (message instanceof MqttPublish) { synchronized (queueLock) { if (actualInFlight >= light) { throw new MqttException(_CODE_MAX_INFLIGHT); } MqttMessage innerMessage = ((MqttPublish) message).getMessage(); switch(()) {
版权声明:本文标题:org.eclipse.paho.client.mqttv3源码解析(一)发送 内容由网友自发贡献,该文观点仅代表作者本人, 转载请联系作者并注明出处:http://www.roclinux.cn/b/1708772914a531001.html, 本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌抄袭侵权/违法违规的内容,一经查实,本站将立刻删除。
发表评论