All predicatesShow sourcestomp.pl -- STOMP client.

This module provides a STOMP (Simple (or Streaming) Text Orientated Messaging Protocol) client. This client is based on work by Hongxin Liang. The current version is a major rewrite, both changing the API and the low-level STOMP frame (de)serialization.

The predicate stomp_connection/5 is used to register a connection. The connection is established by stomp_connect/1, which is lazily called from any of the predicates that send a STOMP frame. After establishing the connection two threads are created. One receives STOMP frames and the other manages and watches the heart beat.

Threading

Upon receiving a frame the callback registered with stomp_connection/5 is called in the context of the receiving thread. More demanding applications may decide to send incomming frames to a SWI-Prolog message queue and have one or more worker threads processing the input. Alternatively, frames may be inspected by the receiving thread and either processed immediately or be dispatched to either new or running threads. The best scenario depends on the message rate, processing time and concurrency of the Prolog application.

All message sending predicates of this library are thread safe. If two threads send a frame to the same connection the library ensures that both frames are properly serialized.

Reconnecting

By default this library tries to establish the connection and the user gets notified by means of a disconnected pseudo frame that the connection is lost. Using the Options argument of stomp_connection/6 the system can be configured to try and keep connecting if the server is not available and reconnect if the connection is lost. See the pong.pl example.

author
- Hongxin Liang and Jan Wielemaker
See also
- http://stomp.github.io/index.html
- https://github.com/jasonrbriggs/stomp.py
license
- BSD-2
To be done
- TSL support
Source stomp_connection(+Address, +Host, +Headers, :Callback, -Connection) is det
Source stomp_connection(+Address, +Host, +Headers, :Callback, -Connection, +Options) is det
Create a connection reference. The connection is not set up yet by this predicate. Callback is called on any received frame except for heart beat frames as below.
call(Callback, Command, Connection, Header, Body)

Where command is one of the commands below. Header is a dict holding the STOMP frame header, where all values are strings except for the 'content-length' key value which is passed as an integer.

Body is a string or, if the data is of the type application/json, a dict.

connected
A connection was established. Connection and Header are valid.
disconnected
The connection was lost. Only Connection is valid.
message
A message arrived. All three arguments are valid. Body is a dict if the content-type of the message is application/json and a string otherwise.
heartbeat
A heartbeat was received. Only Connection is valid. This callback is also called for each newline that follows a frame. These newlines can be a heartbeat, but can also be additional newlines that follow a frame.
heartbeat_timeout
No heartbeat was received. Only Connection is valid.
error
An error happened. All three arguments are valid and handled as message.

Note that stomp_teardown/1 causes the receiving and heartbeat thread to be signalled with abort/0.

Options processed:

reconnect(+Bool)
Try to reestablish the connection to the server if the connection is lost. Default is false
connect_timeout(+Seconds)
Maximum time to try and reestablish a connection. The default is 600 (10 minutes).
json_options(+Options)
Options passed to json_read_dict/3 to translate application/json content to Prolog. Default is [].
Arguments:
Address- is a valid address for tcp_connect/3. Normally a term Host:Port, e.g., localhost:32772.
Host- is a path denoting the STOMP host. Often just /.
Headers- is a dict with STOMP headers used for the CONNECT request.
Connection- is an opaque ground term that identifies the connection.
Source stomp_connection_property(?Connection, ?Property) is nondet
True when Property, is a property of Connection. Defined properties are:
address(Address)
callback(Callback)
host(Host)
headers(Headers)
reconnect(Bool)
connect_timeout(Seconds)
All the above properties result from the stomp_connection/6 registration.
receiver_thread_id(Thread)
stream(Stream)
heartbeat_thread_id(Thread)
received_heartbeat(TimeStamp)
These describe an active STOMP connection.
Source stomp_destroy_connection(+Connection)
Destroy a connection. If it is active, first use stomp_teardown/1 to disconnect.
Source stomp_setup(+Connection, +Options) is det
Set up the actual socket connection and start receiving thread. This is a no-op if the connection has already been created. The Options processed are below. Other options are passed to tcp_connect/3.
timeout(+Seconds)
If tcp_connect/3 fails, retry until the timeout is reached. If Seconds is inf or infinite, keep retrying forever.
Source connect(+Connection, +Address, -Stream, +Options) is det[private]
Connect to Address. If the option timeout(Sec) is present, retry the connection until the timeout is reached.
Source stomp_teardown(+Connection) is semidet
Tear down the socket connection, stop receiving thread and heartbeat thread (if applicable). The registration of the connection as created by stomp_connection/5 is preserved and the connection may be reconnected using stomp_connect/1.
Source stomp_reconnect(+Connection) is det
Teardown the connection and try to reconnect.
Source stomp_connect(+Connection) is det
Source stomp_connect(+Connection, +Options) is det
Setup the connection. First ensures a socket connection and if this is new send a CONNECT frame. Protocol version and heartbeat negotiation will be handled. STOMP frame is not used for backward compatibility.

This predicate waits for the connection handshake to have completed. Currently it waits for a maximum of 10 seconds after establishing the socket for the server reply.

Calling this on an established connection has no effect.

Errors
- stomp_error(connect, Connection, Detail) if no connection could be established.
See also
- http://stomp.github.io/stomp-specification-1.2.html#CONNECT_or_STOMP_Frame).
Source stomp_deadline(+Connection, -Deadline, +Options) is det[private]
True when there is a connection timeout and Deadline is the deadline for establishing a connection. Deadline is one of
Number
The deadline as a time stamp
infinite
Keep trying
once
Try to connect once.
Source stomp_send(+Connection, +Destination, +Headers, +Body) is det
Send a SEND frame. If content-type is not provided, text/plain will be used. content-length will be filled in automatically.
See also
- http://stomp.github.io/stomp-specification-1.2.html#SEND
Source stomp_send_json(+Connection, +Destination, +Headers, +JSON) is det
Send a SEND frame. JSON can be either a JSON term or a dict. content-type is filled in automatically as application/json and content-length will be filled in automatically as well.
See also
- http://stomp.github.io/stomp-specification-1.2.html#SEND
Source stomp_subscribe(+Connection, +Destination, +Id, +Headers) is det
Send a SUBSCRIBE frame.
See also
- http://stomp.github.io/stomp-specification-1.2.html#SUBSCRIBE
Source stomp_unsubscribe(+Connection, +Id) is det
Send an UNSUBSCRIBE frame.
See also
- http://stomp.github.io/stomp-specification-1.2.html#UNSUBSCRIBE
Source stomp_ack(+Connection, +MessageId, +Headers) is det
Send an ACK frame. See stomp_ack/2 for simply passing the header received with the message we acknowledge.
See also
- http://stomp.github.io/stomp-specification-1.2.html#ACK
Source stomp_nack(+Connection, +MessageId, +Headers) is det
Send a NACK frame. See stomp_nack/2 for simply passing the header received with the message we acknowledge.
See also
- http://stomp.github.io/stomp-specification-1.2.html#NACK
Source stomp_ack(+Connection, +MsgHeader) is det
Source stomp_nack(+Connection, +MsgHeader) is det
Reply with an ACK or NACK based on the received message header. On a STOMP 1.1 request we get an ack field in the header and reply with an id. For STOMP 1.2 we reply with the message-id and subscription that we received with the message.
Source stomp_begin(+Connection, +Transaction) is det
Send a BEGIN frame. @see http://stomp.github.io/stomp-specification-1.2.html#BEGIN
Source stomp_commit(+Connection, +Transaction) is det
Send a COMMIT frame.
See also
- http://stomp.github.io/stomp-specification-1.2.html#COMMIT
Source stomp_abort(+Connection, +Transaction) is det
Send a ABORT frame.
See also
- http://stomp.github.io/stomp-specification-1.2.html#ABORT
Source stomp_transaction(+Connection, :Goal) is semidet
Run Goal as once/1, tagging all SEND messages inside the transaction with the transaction id. If Goal fails or raises an exception the transaction is aborted. Failure or exceptions cause the transaction to be aborted using stomp_abort/2, after which the result is forwarded.
Source stomp_disconnect(+Connection, +Headers) is det
Send a DISCONNECT frame. If the connection has the reconnect property set to true, this will be reset to disconnected to avoid reconnecting. A subsequent stomp_connect/2 resets the reconnect status.
See also
- http://stomp.github.io/stomp-specification-1.2.html#DISCONNECT
Source send_frame(+Connection, +Command, +Headers) is det[private]
Source send_frame(+Connection, +Command, +Headers, +Body) is det[private]
Send a frame. If no connection is established connect first. If the sending results in an I/O error, disconnect, reconnect and try again if the reconnect propertys is set.
Source connection_stream(+Connection, -Stream)[private]
Source read_frame(+Connection, +Stream, -Frame) is det[private]
Read a frame from a STOMP stream. On end-of-file, Frame is unified with the atom end_of_file. Otherwise it is a dict holding the cmd, headers (another dict) and body (a string)
Source read_content(+Connection, +Stream, +Header, -Content) is det[private]
Read the body. Note that the body may be followed by an arbitrary number of newlines. We leave them in place to avoid blocking.
Source receive(+Connection, +Stream) is det[private]
Read and process incoming messages from Stream.
Source receive_done(+Connection, +Why)[private]
The receiver thread needs to close the connection due to reading end-of-file, an I/O error or failure to parse a frame. If connection is configured to be restarted this thread will try to reestablish the connection. After reestablishing the connection this receiver thread terminates.
Source process_frame(+Connection, +Frame) is det[private]
Process an incoming frame.
Source notify(+Connection, +FrameType) is det[private]
Source notify(+Connection, +FrameType, +Header) is det[private]
Source notify(+Connection, +FrameType, +Header, +Body) is det[private]
Call the callback using FrameType.

Re-exported predicates

The following predicates are exported from this file while their implementation is defined in imported modules or non-module files loaded by this module.

Source stomp_connection(+Address, +Host, +Headers, :Callback, -Connection) is det
Source stomp_connection(+Address, +Host, +Headers, :Callback, -Connection, +Options) is det
Create a connection reference. The connection is not set up yet by this predicate. Callback is called on any received frame except for heart beat frames as below.
call(Callback, Command, Connection, Header, Body)

Where command is one of the commands below. Header is a dict holding the STOMP frame header, where all values are strings except for the 'content-length' key value which is passed as an integer.

Body is a string or, if the data is of the type application/json, a dict.

connected
A connection was established. Connection and Header are valid.
disconnected
The connection was lost. Only Connection is valid.
message
A message arrived. All three arguments are valid. Body is a dict if the content-type of the message is application/json and a string otherwise.
heartbeat
A heartbeat was received. Only Connection is valid. This callback is also called for each newline that follows a frame. These newlines can be a heartbeat, but can also be additional newlines that follow a frame.
heartbeat_timeout
No heartbeat was received. Only Connection is valid.
error
An error happened. All three arguments are valid and handled as message.

Note that stomp_teardown/1 causes the receiving and heartbeat thread to be signalled with abort/0.

Options processed:

reconnect(+Bool)
Try to reestablish the connection to the server if the connection is lost. Default is false
connect_timeout(+Seconds)
Maximum time to try and reestablish a connection. The default is 600 (10 minutes).
json_options(+Options)
Options passed to json_read_dict/3 to translate application/json content to Prolog. Default is [].
Arguments:
Address- is a valid address for tcp_connect/3. Normally a term Host:Port, e.g., localhost:32772.
Host- is a path denoting the STOMP host. Often just /.
Headers- is a dict with STOMP headers used for the CONNECT request.
Connection- is an opaque ground term that identifies the connection.
Source stomp_connect(+Connection) is det
Source stomp_connect(+Connection, +Options) is det
Setup the connection. First ensures a socket connection and if this is new send a CONNECT frame. Protocol version and heartbeat negotiation will be handled. STOMP frame is not used for backward compatibility.

This predicate waits for the connection handshake to have completed. Currently it waits for a maximum of 10 seconds after establishing the socket for the server reply.

Calling this on an established connection has no effect.

Errors
- stomp_error(connect, Connection, Detail) if no connection could be established.
See also
- http://stomp.github.io/stomp-specification-1.2.html#CONNECT_or_STOMP_Frame).
Source stomp_ack(+Connection, +MsgHeader) is det
Source stomp_nack(+Connection, +MsgHeader) is det
Reply with an ACK or NACK based on the received message header. On a STOMP 1.1 request we get an ack field in the header and reply with an id. For STOMP 1.2 we reply with the message-id and subscription that we received with the message.