34
35:- module(stomp,
36 [ stomp_connection/5, 37 38 stomp_connection/6, 39 40 stomp_connection_property/2, 41 stomp_destroy_connection/1, 42 stomp_connect/1, 43 stomp_connect/2, 44 stomp_teardown/1, 45 stomp_reconnect/1, 46 stomp_send/4, 47 stomp_send_json/4, 48 stomp_subscribe/4, 49 stomp_unsubscribe/2, 50 stomp_ack/2, 51 stomp_nack/2, 52 stomp_ack/3, 53 stomp_nack/3, 54 stomp_transaction/2, 55 stomp_disconnect/2, 56 57 stomp_begin/2, 58 stomp_commit/2, 59 stomp_abort/2, 60 stomp_setup/2 61 ]). 62
106
107:- meta_predicate
108 stomp_connection(+, +, +, 4, -),
109 stomp_connection(+, +, +, 4, -, +),
110 stomp_transaction(+, 0). 111
112:- use_module(library(apply)). 113:- use_module(library(debug)). 114:- use_module(library(error)). 115:- use_module(library(gensym)). 116:- use_module(library(http/http_stream)). 117:- use_module(library(http/json)). 118:- use_module(library(readutil)). 119:- use_module(library(socket)). 120:- use_module(library(uuid)). 121:- use_module(library(lists)). 122:- use_module(library(option)). 123:- use_module(library(time)). 124
125:- dynamic
126 connection_property/3. 127
189
190stomp_connection(Address, Host, Headers, Callback, Connection) :-
191 stomp_connection(Address, Host, Headers, Callback, Connection, []).
192
193stomp_connection(Address, Host, Headers, Callback, Connection, Options) :-
194 option(reconnect(Reconnect), Options, false),
195 option(connect_timeout(Timeout), Options, 600),
196 option(json_options(JSONOptions), Options, []),
197 valid_address(Address),
198 must_be(atom, Host),
199 must_be(dict, Headers),
200 must_be(callable, Callback),
201 uuid(Connection),
202 retractall(connection_property(Connection, _, _)),
203 update_connection_mapping(
204 Connection,
205 _{ address: Address,
206 callback: Callback,
207 host: Host,
208 headers: Headers,
209 reconnect: Reconnect,
210 connect_timeout: Timeout,
211 json_options: JSONOptions
212 }).
213
214valid_address(Host:Port) :-
215 !,
216 must_be(atom, Host),
217 must_be(integer, Port).
218valid_address(Address) :-
219 type_error(stom_address, Address).
220
221connection_property(address).
222connection_property(callback).
223connection_property(host).
224connection_property(headers).
225connection_property(reconnect).
226connection_property(connect_timeout).
227
246
247stomp_connection_property(Connection, Property) :-
248 var(Property),
249 !,
250 connection_property(Connection, Name, Value),
251 Property =.. [Name,Value].
252stomp_connection_property(Connection, Property) :-
253 must_be(compound, Property),
254 Property =.. [Name,Value],
255 query_connection_property(Connection, Name, Value).
256
261
262stomp_destroy_connection(Connection) :-
263 must_be(ground, Connection),
264 ( query_connection_property(Connection, address, _)
265 -> stomp_teardown(Connection),
266 retractall(connection_property(Connection, _, _))
267 ; existence_error(stomp_connection, Connection)
268 ).
269
279
280stomp_setup(Connection, Options) :-
281 stomp_setup(Connection, _New, Options).
282
283stomp_setup(Connection, false, _) :-
284 query_connection_property(Connection, stream, _Stream),
285 !.
286stomp_setup(Connection, New, Options) :-
287 with_mutex(stomp, stomp_setup_guarded(Connection, New, Options)).
288
289stomp_setup_guarded(Connection, false, _) :-
290 query_connection_property(Connection, stream, _Stream),
291 !.
292stomp_setup_guarded(Connection, true, Options) :-
293 query_connection_property(Connection, address, Address),
294 connect(Connection, Address, Stream, Options),
295 set_stream(Stream, encoding(utf8)),
296 gensym(stomp_receive, Alias),
297 thread_create(receive(Connection, Stream), ReceiverThreadId, [alias(Alias)]),
298 debug(stomp(connection), 'Handling input on thread ~p', [ReceiverThreadId]),
299 update_connection_mapping(Connection,
300 _{ receiver_thread_id: ReceiverThreadId,
301 stream:Stream
302 }).
303
308
309connect(Connection, Address, Stream, Options) :-
310 stomp_deadline(Connection, Deadline, Options),
311 connect_with_deadline(Connection, Address, Stream, Deadline, Options).
312
313connect_with_deadline(_Connection, Address, Stream, once, Options) :-
314 !,
315 tcp_connect(Address, Stream, Options).
316connect_with_deadline(Connection, Address, Stream, Deadline, Options) :-
317 number(Deadline),
318 !,
319 between(0, infinite, Retry),
320 get_time(Now),
321 Timeout is Deadline-Now,
322 ( Now > 0
323 -> ( catch(call_with_time_limit(
324 Timeout,
325 tcp_connect(Address, Stream, Options)),
326 Error,
327 true)
328 -> ( var(Error)
329 -> !
330 ; ( debugging(stomp(connection))
331 -> print_message(warning, Error)
332 ; true
333 ),
334 wait_retry(Connection, Error, Retry, Deadline)
335 )
336 ; wait_retry(Connection, failed, Retry, Deadline)
337 )
338 ; throw(stomp_error(connect, Connection, timeout))
339 ).
340connect_with_deadline(Connection, Address, Stream, Deadline, Options) :-
341 between(0, infinite, Retry),
342 Error = error(Formal, _),
343 ( catch(tcp_connect(Address, Stream, Options),
344 Error,
345 true)
346 -> ( var(Formal)
347 -> !
348 ; ( debugging(stomp(connection))
349 -> print_message(warning, Error)
350 ; true
351 ),
352 wait_retry(Connection, Formal, Retry, Deadline)
353 )
354 ; wait_retry(Connection, failed, Retry, Deadline)
355 ).
356
357wait_retry(Connection, Why, _Retry, _Deadline) :-
358 Why = error(stomp_error(connect, Connection, error(_)), _),
359 !,
360 throw(Why).
361wait_retry(Connection, _Why, Retry, Deadline) :-
362 Wait0 is min(10, 0.1 * (1<<Retry)),
363 ( number(Deadline)
364 -> get_time(Now),
365 Wait is min(Deadline-Now, Wait0)
366 ; Wait = Wait0
367 ),
368 ( Wait > 0
369 -> sleep(Wait),
370 fail
371 ; throw(error(stomp_error(connect, Connection, timeout), _))
372 ).
373
374
381
382stomp_teardown(Connection) :-
383 terminate_helper(Connection, receiver_thread_id),
384 terminate_helper(Connection, heartbeat_thread_id),
385 forall(query_connection_property(Connection, stream, Stream),
386 close(Stream, [force(true)])),
387 debug(stomp(connection), 'retract connection mapping for ~p', [Connection]),
388 reset_connection_properties(Connection).
389
390terminate_helper(Connection, Helper) :-
391 retract(connection_property(Connection, Helper, Thread)),
392 \+ thread_self(Thread),
393 catch(thread_signal(Thread, abort), error(_,_), fail),
394 !,
395 thread_join(Thread, _Status).
396terminate_helper(_, _).
397
398reset_connection_properties(Connection) :-
399 findall(P,
400 ( query_connection_property(Connection, P, _),
401 \+ connection_property(P)
402 ), Ps),
403 forall(member(P, Ps),
404 retractall(connection_property(Connection, P, _))).
405
409
410stomp_reconnect(Connection) :-
411 stomp_teardown(Connection),
412 stomp_connect(Connection).
413
431
432stomp_connect(Connection) :-
433 stomp_connect(Connection, []).
434
435stomp_connect(Connection, Options) :-
436 update_reconnect_property(Connection),
437 stomp_deadline(Connection, Deadline, Options),
438 stomp_deadline_connect(Connection, Deadline, Options).
439
440update_reconnect_property(Connection) :-
441 query_connection_property(Connection, reconnect, disconnected),
442 !,
443 update_connection_property(Connection, reconnect, true).
444update_reconnect_property(_).
445
446stomp_deadline_connect(Connection, Deadline, Options) :-
447 between(0, infinite, Retry),
448 stomp_setup(Connection, New, [deadline(Deadline)|Options]),
449 ( New == true
450 -> Error = error(Formal, _),
451 catch(connect_handshake(Connection), Error, true),
452 ( var(Formal)
453 -> !
454 ; stomp_teardown(Connection),
455 wait_retry(Connection, Error, Retry, Deadline)
456 )
457 ; query_connection_property(Connection, connected, _)
458 -> true
459 ; wait_connected(Connection)
460 -> true
461 ; stomp_teardown(Connection),
462 wait_retry(Connection, failed, Retry, Deadline)
463 ).
464
465connect_handshake(Connection) :-
466 query_connection_property(Connection, headers, Headers),
467 query_connection_property(Connection, host, Host),
468 send_frame(Connection,
469 connect,
470 Headers.put(_{ 'accept-version':'1.0,1.1,1.2',
471 host:Host
472 })),
473 ( Heartbeat = Headers.get('heart-beat')
474 -> update_connection_property(Connection, 'heart-beat', Heartbeat)
475 ; true
476 ),
477 thread_self(Self),
478 update_connection_property(Connection, waiting_thread, Self),
479 ( thread_get_message(Self, stomp(connected(Connection, Status)),
480 [timeout(10)])
481 -> ( Status == true
482 -> get_time(Now),
483 update_connection_property(Connection, connected, Now)
484 ; throw(error(stomp_error(connect, Connection, Status), _))
485 )
486 ; throw(error(stomp_error(connect, Connection, timeout), _))
487 ).
488
489wait_connected(Connection) :-
490 thread_wait(query_connection_property(Connection, connected, _),
491 [ timeout(10),
492 wait_preds([connection_property/3])
493 ]),
494 !.
495wait_connected(Connection) :-
496 throw(error(stomp_error(connect, Connection, timeout), _)).
497
509
510stomp_deadline(_Connection, Deadline, Options) :-
511 option(deadline(Deadline), Options),
512 !.
513stomp_deadline(Connection, Deadline, Options) :-
514 ( option(timeout(Time), Options)
515 ; query_connection_property(Connection, connect_timeout, Time)
516 ),
517 !,
518 ( number(Time)
519 -> get_time(Now),
520 Deadline is Now+Time
521 ; must_be(oneof([inf,infinite]), Time),
522 Deadline = infinite
523 ).
524stomp_deadline(_, once, _).
525
526
534
535stomp_send(Connection, Destination, Headers, Body) :-
536 add_transaction(Headers, Headers1),
537 send_frame(Connection, send, Headers1.put(destination, Destination), Body).
538
546
547stomp_send_json(Connection, Destination, Headers, JSON) :-
548 add_transaction(Headers, Headers1),
549 atom_json_term(Body, JSON,
550 [ as(string),
551 width(0) 552 ]),
553 send_frame(Connection, send,
554 Headers1.put(_{ destination:Destination,
555 'content-type':'application/json'
556 }),
557 Body).
558
564
565stomp_subscribe(Connection, Destination, Id, Headers) :-
566 send_frame(Connection, subscribe,
567 Headers.put(_{destination:Destination, id:Id})).
568
574
575stomp_unsubscribe(Connection, Id) :-
576 send_frame(Connection, unsubscribe, _{id:Id}).
577
584
585stomp_ack(Connection, MessageId, Headers) :-
586 send_frame(Connection, ack, Headers.put('message-id', MessageId)).
587
594
595stomp_nack(Connection, MessageId, Headers) :-
596 send_frame(Connection, nack, Headers.put('message-id', MessageId)).
597
605
606stomp_ack(Connection, Header) :-
607 stomp_ack_nack(Connection, ack, Header).
608
609stomp_nack(Connection, Header) :-
610 stomp_ack_nack(Connection, nack, Header).
611
612stomp_ack_nack(Connection, Type, Header) :-
613 ( Id = Header.get(ack)
614 -> send_frame(Connection, Type, _{id: Id})
615 ; Pass = _{'message-id':_, subscription:_},
616 Pass :< Header
617 -> send_frame(Connection, Type, Pass)
618 ).
619
620
625
626stomp_begin(Connection, Transaction) :-
627 send_frame(Connection, begin, _{transaction:Transaction}).
628
634
635stomp_commit(Connection, Transaction) :-
636 send_frame(Connection, commit, _{transaction:Transaction}).
637
643
644stomp_abort(Connection, Transaction) :-
645 send_frame(Connection, abort, _{transaction:Transaction}).
646
654
655stomp_transaction(Connection, Goal) :-
656 uuid(TransactionID),
657 stomp_transaction(Connection, Goal, TransactionID).
658
659stomp_transaction(Connection, Goal, TransactionID) :-
660 stomp_begin(Connection, TransactionID),
661 ( catch(once(Goal), Error, true)
662 -> ( var(Error)
663 -> stomp_commit(Connection, TransactionID)
664 ; stomp_abort(Connection, TransactionID),
665 throw(Error)
666 )
667 ; stomp_abort(Connection, TransactionID),
668 fail
669 ).
670
671in_transaction(TransactionID) :-
672 prolog_current_frame(Frame),
673 prolog_frame_attribute(
674 Frame, parent_goal,
675 stomp_transaction(_Connection, _Goal, TransactionID)).
676
677add_transaction(Headers, Headers1) :-
678 in_transaction(TransactionID),
679 !,
680 Headers1 = Headers.put(transaction, TransactionID).
681add_transaction(Headers, Headers).
682
683
692
693stomp_disconnect(Connection, Headers) :-
694 ( query_connection_property(Connection, reconnect, true)
695 -> update_connection_property(Connection, reconnect, disconnected)
696 ; true
697 ),
698 send_frame(Connection, disconnect, Headers).
699
706
707send_frame(Connection, Command, Headers) :-
708 send_frame(Connection, Command, Headers, "").
709
710send_frame(Connection, Command, Headers, Body) :-
711 Error = error(Formal,_),
712 catch(send_frame_guarded(Connection, Command, Headers, Body),
713 Error,
714 true),
715 ( var(Formal)
716 -> true
717 ; query_connection_property(Connection, reconnect, true)
718 -> notify(Connection, disconnected),
719 stomp_teardown(Connection),
720 debug(stomp(connection), 'Sending thread reconnects', []),
721 send_frame(Connection, Command, Headers, Body)
722 ; notify(Connection, disconnected),
723 throw(Error)
724 ).
725
726send_frame_guarded(Connection, Command, Headers, Body) :-
727 has_body(Command),
728 !,
729 connection_stream(Connection, Stream),
730 default_content_type('text/plain', Headers, Headers1),
731 body_bytes(Body, ContentLength),
732 Headers2 = Headers1.put('content-length', ContentLength),
733 with_output_to(Stream,
734 ( send_command(Stream, Command),
735 send_header(Stream, Headers2),
736 format(Stream, '~w\u0000\n', [Body]),
737 flush_output(Stream))).
738send_frame_guarded(Connection, heartbeat, _Headers, _Body) :-
739 !,
740 connection_stream(Connection, Stream),
741 nl(Stream),
742 flush_output(Stream).
743send_frame_guarded(Connection, Command, Headers, _Body) :-
744 connection_stream(Connection, Stream),
745 with_output_to(Stream,
746 ( send_command(Stream, Command),
747 send_header(Stream, Headers),
748 format(Stream, '\u0000\n', []),
749 flush_output(Stream))).
750
752
753connection_stream(Connection, Stream) :-
754 query_connection_property(Connection, stream, Stream),
755 !.
756connection_stream(Connection, Stream) :-
757 stomp_connect(Connection),
758 query_connection_property(Connection, stream, Stream).
759
760send_command(Stream, Command) :-
761 string_upper(Command, Upper),
762 format(Stream, '~w\n', [Upper]).
763
(Stream, Headers) :-
765 dict_pairs(Headers, _, Pairs),
766 maplist(send_header_line(Stream), Pairs),
767 nl(Stream).
768
(Stream, Name-Value) :-
770 ( number(Value)
771 -> format(Stream, '~w:~w\n', [Name,Value])
772 ; escape_value(Value, String),
773 format(Stream, '~w:~w\n', [Name,String])
774 ).
775
776escape_value(Value, String) :-
777 split_string(Value, "\n:\\", "", [_]),
778 !,
779 String = Value.
780escape_value(Value, String) :-
781 string_codes(Value, Codes),
782 phrase(escape(Codes), Encoded),
783 string_codes(String, Encoded).
784
785escape([]) --> [].
786escape([H|T]) --> esc(H), escape(T).
787
788esc(0'\r) --> !, "\\r".
789esc(0'\n) --> !, "\\n".
790esc(0':) --> !, "\\c".
791esc(0'\\) --> !, "\\\\".
792esc(C) --> [C].
793
794default_content_type(ContentType, Header0, Header) :-
795 ( get_dict('content-type', Header0, _)
796 -> Header = Header0
797 ; put_dict('content-type', Header0, ContentType, Header)
798 ).
799
800body_bytes(String, Bytes) :-
801 setup_call_cleanup(
802 open_null_stream(Out),
803 ( write(Out, String),
804 byte_count(Out, Bytes)
805 ),
806 close(Out)).
807
808
809 812
818
819read_frame(Connection, Stream, Frame) :-
820 read_command(Stream, Command),
821 ( Command == end_of_file
822 -> Frame = end_of_file
823 ; Command == heartbeat
824 -> Frame = stomp_frame{cmd:heartbeat}
825 ; read_header(Stream, Header),
826 ( has_body(Command)
827 -> read_content(Connection, Stream, Header, Content),
828 Frame = stomp_frame{cmd:Command, headers:Header, body:Content}
829 ; Frame = stomp_frame{cmd:Command, headers:Header}
830 )
831 ).
832
833has_body(send).
834has_body(message).
835has_body(error).
836
837read_command(Stream, Command) :-
838 read_line_to_string(Stream, String),
839 debug(stomp(command), 'Got line ~p', [String]),
840 ( String == end_of_file
841 -> Command = end_of_file
842 ; String == ""
843 -> Command = heartbeat
844 ; string_lower(String, Lwr),
845 atom_string(Command, Lwr)
846 ).
847
(Stream, Header) :-
849 read_header_lines(Stream, Pairs, []),
850 dict_pairs(Header, stomp_header, Pairs).
851
(Stream, Header, Seen) :-
853 read_line_to_string(Stream, Line),
854 ( Line == ""
855 -> Header = []
856 ; sub_string(Line, Before, _, After, ":")
857 -> sub_atom(Line, 0, Before, _, Name),
858 ( memberchk(Name, Seen)
859 -> read_header_lines(Stream, Header, Seen)
860 ; sub_string(Line, _, After, 0, Value0),
861 convert_value(Name, Value0, Value),
862 Header = [Name-Value|More],
863 read_header_lines(Stream, More, [Name|Seen])
864 )
865 ).
866
867convert_value('content-length', String, Bytes) :-
868 !,
869 number_string(Bytes, String).
870convert_value(_, String, Value) :-
871 unescape_header_value(String, Value).
872
(String, Value) :-
874 sub_atom(String, _, _, _, "\\"),
875 !,
876 string_codes(String, Codes),
877 phrase(unescape(Plain), Codes),
878 string_codes(Value, Plain).
879unescape_header_value(String, String).
880
881unescape([H|T]) --> "\\", !, unesc(H), unescape(T).
882unescape([H|T]) --> [H], !, unescape(T).
883unescape([]) --> [].
884
885unesc(0'\r) --> "r", !.
886unesc(0'\n) --> "n", !.
887unesc(0':) --> "c", !.
888unesc(0'\\) --> "\\", !.
889unesc(_) --> [C], { syntax_error(invalid_stomp_escape(C)) }.
890
895
896read_content(Connection, Stream, Header, Content) :-
897 _{ 'content-length':Bytes,
898 'content-type':Type
899 } :< Header,
900 setup_call_cleanup(
901 stream_range_open(Stream, DataStream, [size(Bytes)]),
902 read_content(Connection, Type, DataStream, Header, Content),
903 close(DataStream)).
904
905read_content(Connection, "application/json", Stream, _Header, Content) :-
906 !,
907 query_connection_property(Connection, json_options, Options),
908 json_read_dict(Stream, Content, Options).
909read_content(_Connection, _Type, Stream, _Header, Content) :-
910 read_string(Stream, _, Content).
911
912
916
917receive(Connection, Stream) :-
918 E = error(Formal, _),
919 catch(read_frame(Connection, Stream, Frame), E, true),
920 !,
921 ( var(Formal)
922 -> debug(stomp(receive), 'received frame ~p', [Frame]),
923 ( Frame == end_of_file
924 -> receive_done(Connection, end_of_file)
925 ; process_frame(Connection, Frame),
926 receive(Connection, Stream)
927 )
928 ; receive_done(Connection, E)
929 ).
930receive(Connection, _Stream) :-
931 receive_done(Connection, "failed to read frame").
932
940
941receive_done(Connection, Why) :-
942 ( Why = error(_,_)
943 -> print_message(warning, Why)
944 ; true
945 ),
946 notify(Connection, disconnected),
947 stomp_teardown(Connection),
948 ( query_connection_property(Connection, reconnect, true)
949 -> debug(stomp(connection), 'Receiver thread reconnects (~p)', [Why]),
950 stomp_connect(Connection)
951 ; debug(stomp(connection), 'Receiver thread terminates (~p)', [Why])
952 ),
953 thread_self(Me),
954 thread_detach(Me).
955
959
960process_frame(Connection, Frame) :-
961 Frame.cmd = heartbeat, !,
962 get_time(Now),
963 debug(stomp(heartbeat), 'received heartbeat at ~w', [Now]),
964 update_connection_property(Connection, received_heartbeat, Now),
965 notify(Connection, heartbeat, _{}, "").
966process_frame(Connection, Frame) :-
967 _{cmd:FrameType, headers:Headers, body:Body} :< Frame,
968 !,
969 process_connect(FrameType, Connection, Frame),
970 notify(Connection, FrameType, Headers, Body).
971process_frame(Connection, Frame) :-
972 _{cmd:FrameType, headers:Headers} :< Frame,
973 process_connect(FrameType, Connection, Frame),
974 notify(Connection, FrameType, Headers).
975
976process_connect(connected, Connection, Frame) :-
977 retract(connection_property(Connection, waiting_thread, Waiting)),
978 !,
979 thread_send_message(Waiting, stomp(connected(Connection, true))),
980 start_heartbeat_if_required(Connection, Frame.headers).
981process_connect(error, Connection, Frame) :-
982 retract(connection_property(Connection, waiting_thread, Waiting)),
983 !,
984 thread_send_message(
985 Waiting,
986 stomp(connected(Connection, error(Frame.body)))).
987process_connect(_, _, _).
988
989start_heartbeat_if_required(Connection, Headers) :-
990 ( query_connection_property(Connection, 'heart-beat', CHB),
991 SHB = Headers.get('heart-beat')
992 -> start_heartbeat(Connection, CHB, SHB)
993 ; true
994 ).
995
996start_heartbeat(Connection, CHB, SHB) :-
997 extract_heartbeats(CHB, CX, CY),
998 extract_heartbeats(SHB, SX, SY),
999 calculate_heartbeats(CX-CY, SX-SY, X-Y),
1000 \+ (X =:= 0, Y =:= 0),
1001 !,
1002 debug(stomp(heartbeat), 'calculated heartbeats are ~p,~p', [X, Y]),
1003 SendSleep is X / 1000,
1004 ReceiveSleep is Y / 1000 + 2,
1005 ( X =:= 0
1006 -> SleepTime = ReceiveSleep
1007 ; ( Y =:= 0
1008 -> SleepTime = SendSleep
1009 ; SleepTime is gcd(X, Y) / 2000
1010 )
1011 ),
1012 get_time(Now),
1013 gensym(stomp_heartbeat, Alias),
1014 debug(stomp(heartbeat), 'Creating heartbeat thread (~p ~p ~p)',
1015 [SendSleep, ReceiveSleep, SleepTime]),
1016 thread_create(heartbeat_loop(Connection, SendSleep, ReceiveSleep,
1017 SleepTime, Now, Now),
1018 HeartbeatThreadId, [alias(Alias)]),
1019 update_connection_mapping(Connection,
1020 _{ heartbeat_thread_id:HeartbeatThreadId,
1021 received_heartbeat:Now
1022 }).
1023start_heartbeat(_, _, _).
1024
(Heartbeat, X, Y) :-
1026 split_string(Heartbeat, ",", " ", [XS, YS]),
1027 number_string(X, XS),
1028 number_string(Y, YS).
1029
1030calculate_heartbeats(CX-CY, SX-SY, X-Y) :-
1031 ( CX =\= 0, SY =\= 0
1032 -> X is max(CX, floor(SY))
1033 ; X = 0
1034 ),
1035 ( CY =\= 0, SX =\= 0
1036 -> Y is max(CY, floor(SX))
1037 ; Y = 0
1038 ).
1039
1040heartbeat_loop(Connection, SendSleep, ReceiveSleep, SleepTime,
1041 SendTime, ReceiveTime) :-
1042 sleep(SleepTime),
1043 get_time(Now),
1044 ( Now - SendTime > SendSleep
1045 -> SendTime1 = Now,
1046 debug(stomp(heartbeat), 'sending a heartbeat message at ~p', [Now]),
1047 send_frame(Connection, heartbeat, _{})
1048 ; SendTime1 = SendTime
1049 ),
1050 ( Now - ReceiveTime > ReceiveSleep
1051 -> ReceiveTime1 = Now,
1052 check_receive_heartbeat(Connection, Now, ReceiveSleep)
1053 ; ReceiveTime1 = ReceiveTime
1054 ),
1055 heartbeat_loop(Connection, SendSleep, ReceiveSleep, SleepTime,
1056 SendTime1, ReceiveTime1).
1057
1058check_receive_heartbeat(Connection, Now, ReceiveSleep) :-
1059 query_connection_property(Connection, received_heartbeat, ReceivedHeartbeat),
1060 DiffReceive is Now - ReceivedHeartbeat,
1061 ( DiffReceive > ReceiveSleep
1062 -> debug(stomp(heartbeat),
1063 'Heartbeat timeout: diff_receive=~p, time=~p, lastrec=~p',
1064 [DiffReceive, Now, ReceivedHeartbeat]),
1065 notify(Connection, heartbeat_timeout)
1066 ; true
1067 ).
1068
1074
1075notify(Connection, FrameType) :-
1076 notify(Connection, FrameType, stomp_header{cmd:FrameType}, "").
1077
1078notify(Connection, FrameType, Header) :-
1079 notify(Connection, FrameType, Header, "").
1080
1081notify(Connection, FrameType, Header, Body) :-
1082 query_connection_property(Connection, callback, Callback),
1083 Error = error(Formal,_),
1084 ( catch_with_backtrace(
1085 call(Callback, FrameType, Connection, Header, Body),
1086 Error, true)
1087 -> ( var(Formal)
1088 -> true
1089 ; print_message(warning, Error)
1090 )
1091 ; true
1092 ).
1093
1094update_connection_mapping(Connection, Dict) :-
1095 dict_pairs(Dict, _, Pairs),
1096 maplist(update_connection_property(Connection), Pairs).
1097
1098update_connection_property(Connection, Name-Value) :-
1099 update_connection_property(Connection, Name, Value).
1100
1101update_connection_property(Connection, Name, Value) :-
1102 transaction(update_connection_property_(Connection, Name, Value)).
1103
1104update_connection_property_(Connection, Name, Value) :-
1105 retractall(connection_property(Connection, Name, _)),
1106 asserta(connection_property(Connection, Name, Value)).
1107
1108query_connection_property(Connection, Name, Value) :-
1109 ( nonvar(Name),
1110 nonvar(Connection)
1111 -> connection_property(Connection, Name, Value),
1112 !
1113 ; connection_property(Connection, Name, Value)
1114 ).
1115
1116
1117 1120
1121:- multifile prolog:error_message//1. 1122
1123prolog:error_message(stomp_error(connect, Connection, error(Message))) -->
1124 { connection_property(Connection, address, Address) },
1125 [ 'STOMPL: Failed to connect to ~p: ~p'-[Address, Message] ].
1126prolog:error_message(stomp_error(connect, Connection, Detail)) -->
1127 { connection_property(Connection, address, Address) },
1128 [ 'STOMPL: Failed to connect to ~p: ~p'-[Address, Detail] ]