41
42:- module(redis,
43 [ redis_server/3, 44 redis_connect/1, 45 redis_connect/3, 46 redis_disconnect/1, 47 redis_disconnect/2, 48 49 redis/1, 50 redis/2, 51 redis/3, 52 53 redis_get_list/3, 54 redis_get_list/4, 55 redis_set_list/3, 56 redis_get_hash/3, 57 redis_set_hash/3, 58 redis_scan/3, 59 redis_sscan/4, 60 redis_hscan/4, 61 redis_zscan/4, 62 63 redis_subscribe/4, 64 redis_subscribe/2, 65 redis_unsubscribe/2, 66 redis_current_subscription/2, 67 redis_write/2, 68 redis_read/2, 69 70 redis_array_dict/3, 71 72 redis_property/2, 73 redis_current_command/2, 74 redis_current_command/3, 75
76 sentinel_slave/4 77 ]). 78:- autoload(library(socket), [tcp_connect/3]). 79:- autoload(library(apply), [maplist/2, convlist/3, maplist/3, maplist/5]). 80:- autoload(library(broadcast), [broadcast/1]). 81:- autoload(library(error),
82 [ must_be/2,
83 type_error/2,
84 instantiation_error/1,
85 uninstantiation_error/1,
86 existence_error/2,
87 existence_error/3
88 ]). 89:- autoload(library(lazy_lists), [lazy_list/2]). 90:- autoload(library(lists), [append/3, member/2]). 91:- autoload(library(option), [merge_options/3, option/2,
92 option/3, select_option/4]). 93:- autoload(library(pairs), [group_pairs_by_key/2]). 94:- autoload(library(time), [call_with_time_limit/2]). 95:- use_module(library(debug), [debug/3, assertion/1]). 96:- use_module(library(settings), [setting/4, setting/2]). 97:- if(exists_source(library(ssl))). 98:- autoload(library(ssl), [ssl_context/3, ssl_negotiate/5]). 99:- endif. 100
101:- use_foreign_library(foreign(redis4pl)). 102
103:- setting(max_retry_count, nonneg, 8640, 104 "Max number of retries"). 105:- setting(max_retry_wait, number, 10,
106 "Max time to wait between recovery attempts"). 107:- setting(sentinel_timeout, number, 0.2,
108 "Time to wait for a sentinel"). 109
110:- predicate_options(redis_server/3, 3,
111 [ pass_to(redis:redis_connect/3, 3)
112 ]). 113:- predicate_options(redis_connect/3, 3,
114 [ reconnect(boolean),
115 user(atom),
116 password(atomic),
117 version(between(2,3))
118 ]). 119:- predicate_options(redis_disconnect/2, 2,
120 [ force(boolean)
121 ]). 122:- predicate_options(redis_scan/3, 3,
123 [ match(atomic),
124 count(nonneg),
125 type(atom)
126 ]). 128:- predicate_options(redis_sscan/4, 4, [pass_to(redis:redis_scan/3, 3)]). 129:- predicate_options(redis_hscan/4, 4, [pass_to(redis:redis_scan/3, 3)]). 130:- predicate_options(redis_zscan/4, 4, [pass_to(redis:redis_scan/3, 3)]). 131
132
150
151:- dynamic server/3. 152
153:- dynamic ( connection/2, 154 sentinel/2 155 ) as volatile. 156
168
169redis_server(Alias, Address, Options) :-
170 must_be(ground, Alias),
171 retractall(server(Alias, _, _)),
172 asserta(server(Alias, Address, Options)).
173
174server(default, localhost:6379, []).
175
229
230redis_connect(Conn) :-
231 redis_connect(default, Conn, []).
232
233redis_connect(Conn, Host, Port) :-
234 var(Conn),
235 ground(Host), ground(Port),
236 !, 237 redis_connect(Host:Port, Conn, []).
238redis_connect(Server, Conn, Options) :-
239 atom(Server),
240 !,
241 ( server(Server, Address, DefaultOptions)
242 -> merge_options(Options, DefaultOptions, Options2),
243 do_connect(Server, Address, Conn, [address(Address)|Options2])
244 ; existence_error(redis_server, Server)
245 ).
246redis_connect(Address, Conn, Options) :-
247 do_connect(Address, Address, Conn, [address(Address)|Options]).
248
254
255do_connect(Id, sentinel(Pool), Conn, Options) =>
256 sentinel_master(Id, Pool, Conn, Options).
257do_connect(Id, Address0, Conn, Options) =>
258 tcp_address(Address0, Address),
259 tcp_connect(Address, Stream0, Options),
260 tls_upgrade(Address, Stream0, Stream, Options),
261 Conn = redis_connection(Id, Stream, 0, Options),
262 hello(Conn, Options).
263
264tcp_address(unix(Path), Path) :-
265 !. 266tcp_address(Address, Address).
267
271
272:- if(current_predicate(ssl_context/3)). 273tls_upgrade(Host:_Port, Raw, Stream, Options) :-
274 option(tls(true), Options),
275 !,
276 must_have_option(cacert(CacertFile), Options),
277 must_have_option(key(KeyFile), Options),
278 must_have_option(cert(CertFile), Options),
279 ssl_context(client, SSL,
280 [ host(Host),
281 certificate_file(CertFile),
282 key_file(KeyFile),
283 cacerts([file(CacertFile)]),
284 cert_verify_hook(tls_verify),
285 close_parent(true)
286 ]),
287 stream_pair(Raw, RawRead, RawWrite),
288 ssl_negotiate(SSL, RawRead, RawWrite, Read, Write),
289 stream_pair(Stream, Read, Write).
290:- endif. 291tls_upgrade(_, Stream, Stream, _).
292
293:- if(current_predicate(ssl_context/3)). 294
300
301:- public tls_verify/5. 302tls_verify(_SSL, _ProblemCert, _AllCerts, _FirstCert, verified) :-
303 !.
304tls_verify(_SSL, _ProblemCert, _AllCerts, _FirstCert, hostname_mismatch) :-
305 !.
306tls_verify(_SSL, _ProblemCert, _AllCerts, _FirstCert, _Error) :-
307 fail.
308
309:- endif. 310
314
315sentinel_master(Id, Pool, Master, Options) :-
316 sentinel_connect(Id, Pool, Conn, Options),
317 setting(sentinel_timeout, TMO),
318 call_cleanup(
319 query_sentinel(Pool, Conn, MasterAddr),
320 redis_disconnect(Conn)),
321 debug(redis(sentinel), 'Sentinel claims master is at ~p', [MasterAddr]),
322 do_connect(Id, MasterAddr, Master, Options),
323 debug(redis(sentinel), 'Connected to claimed master', []),
324 redis(Master, role, Role),
325 ( Role = [master|_Slaves]
326 -> debug(redis(sentinel), 'Verified role at ~p', [MasterAddr])
327 ; redis_disconnect(Master),
328 debug(redis(sentinel), '~p is not the master: ~p', [MasterAddr, Role]),
329 sleep(TMO),
330 sentinel_master(Id, Pool, Master, Options)
331 ).
332
333sentinel_connect(Id, Pool, Conn, Options) :-
334 must_have_option(sentinels(Sentinels), Options),
335 sentinel_auth(Options, Options1),
336 setting(sentinel_timeout, TMO),
337 ( sentinel(Pool, Sentinel)
338 ; member(Sentinel, Sentinels)
339 ),
340 catch(call_with_time_limit(
341 TMO,
342 do_connect(Id, Sentinel, Conn,
343 [sentinel(true)|Options1])),
344 Error,
345 (print_message(warning, Error),fail)),
346 !,
347 debug(redis(sentinel), 'Connected to sentinel at ~p', [Sentinel]),
348 redis(Conn, sentinel(sentinels, Pool), Peers),
349 transaction(update_known_sentinels(Pool, Sentinel, Peers)).
350
351sentinel_auth(Options0, Options) :-
352 option(sentinel_user(User), Options0),
353 option(sentinel_password(Passwd), Options0),
354 !,
355 merge_options([user(User), password(Passwd)], Options0, Options).
356sentinel_auth(Options0, Options) :-
357 select_option(password(_), Options0, Options, _).
358
359
360query_sentinel(Pool, Conn, Host:Port) :-
361 redis(Conn, sentinel('get-master-addr-by-name', Pool), MasterData),
362 MasterData = [Host,Port].
363
364update_known_sentinels(Pool, Sentinel, Peers) :-
365 retractall(sentinel(Pool, _)),
366 maplist(update_peer_sentinel(Pool), Peers),
367 asserta(sentinel(Pool, Sentinel)).
368
369update_peer_sentinel(Pool, Attrs),
370 memberchk(ip-Host, Attrs),
371 memberchk(port-Port, Attrs) =>
372 asserta(sentinel(Pool, Host:Port)).
373
374must_have_option(Opt, Options) :-
375 option(Opt, Options),
376 !.
377must_have_option(Opt, Options) :-
378 existence_error(option, Opt, Options).
379
386
387sentinel_slave(ServerId, Pool, Slave, Options) :-
388 sentinel_connect(ServerId, Pool, Conn, Options),
389 redis(Conn, sentinel(slaves, Pool), Slaves),
390 member(Pairs, Slaves),
391 dict_create(Slave, redis, Pairs).
392
397
398hello(Con, Options) :-
399 option(version(V), Options),
400 V >= 3,
401 !,
402 ( option(user(User), Options),
403 option(password(Password), Options)
404 -> redis(Con, hello(3, auth, User, Password))
405 ; redis(Con, hello(3))
406 ).
407hello(Con, Options) :-
408 option(password(Password), Options),
409 !,
410 redis(Con, auth(Password)).
411hello(_, _).
412
419
420redis_stream(Var, S, _) :-
421 ( var(Var)
422 -> !, instantiation_error(Var)
423 ; nonvar(S)
424 -> !, uninstantiation_error(S)
425 ).
426redis_stream(ServerName, S, Connect) :-
427 atom(ServerName),
428 !,
429 ( connection(ServerName, S0)
430 -> S = S0
431 ; Connect == true,
432 server(ServerName, Address, Options)
433 -> redis_connect(Address, Connection, Options),
434 redis_stream(Connection, S, false),
435 asserta(connection(ServerName, S))
436 ; existence_error(redis_server, ServerName)
437 ).
438redis_stream(redis_connection(_,S0,_,_), S, _) :-
439 S0 \== (-),
440 !,
441 S = S0.
442redis_stream(Redis, S, _) :-
443 Redis = redis_connection(Id,-,_,Options),
444 option(address(Address), Options),
445 do_connect(Id,Address,Redis2,Options),
446 arg(2, Redis2, S0),
447 nb_setarg(2, Redis, S0),
448 S = S0.
449
450has_redis_stream(Var, _) :-
451 var(Var),
452 !,
453 instantiation_error(Var).
454has_redis_stream(Alias, S) :-
455 atom(Alias),
456 !,
457 connection(Alias, S).
458has_redis_stream(redis_connection(_,S,_,_), S) :-
459 S \== (-).
460
461
474
475redis_disconnect(Redis) :-
476 redis_disconnect(Redis, []).
477
478redis_disconnect(Redis, Options) :-
479 option(force(true), Options),
480 !,
481 ( Redis = redis_connection(_Id, S, _, _Opts)
482 -> ( S == (-)
483 -> true
484 ; close(S, [force(true)]),
485 nb_setarg(2, Redis, -)
486 )
487 ; has_redis_stream(Redis, S)
488 -> close(S, [force(true)]),
489 retractall(connection(_,S))
490 ; true
491 ).
492redis_disconnect(Redis, _Options) :-
493 redis_stream(Redis, S, false),
494 close(S),
495 retractall(connection(_,S)).
496
538
539redis(Redis, PipeLine) :-
540 is_list(PipeLine),
541 !,
542 redis_pipeline(Redis, PipeLine).
543redis(Redis, Req) :-
544 redis(Redis, Req, _).
545
663
664redis(Redis, Req, Out) :-
665 out_val(Out, Val),
666 redis1(Redis, Req, Out),
667 Val \== nil.
668
669out_val(Out, Val) :-
670 ( nonvar(Out),
671 Out = (Val as _)
672 -> true
673 ; Val = Out
674 ).
675
676redis1(Redis, Req, Out) :-
677 Error = error(Formal, _),
678 catch(redis2(Redis, Req, Out), Error, true),
679 ( var(Formal)
680 -> true
681 ; recover(Error, Redis, redis1(Redis, Req, Out))
682 ).
683
684redis2(Redis, Req, Out) :-
685 atom(Redis),
686 !,
687 redis_stream(Redis, S, true),
688 with_mutex(Redis,
689 ( redis_write_msg(S, Req),
690 redis_read_stream(Redis, S, Out)
691 )).
692redis2(Redis, Req, Out) :-
693 redis_stream(Redis, S, true),
694 redis_write_msg(S, Req),
695 redis_read_stream(Redis, S, Out).
696
698
699redis_pipeline(Redis, PipeLine) :-
700 Error = error(Formal, _),
701 catch(redis_pipeline2(Redis, PipeLine), Error, true),
702 ( var(Formal)
703 -> true
704 ; recover(Error, Redis, redis_pipeline(Redis, PipeLine))
705 ).
706
707redis_pipeline2(Redis, PipeLine) :-
708 atom(Redis),
709 !,
710 redis_stream(Redis, S, true),
711 with_mutex(Redis,
712 redis_pipeline3(Redis, S, PipeLine)).
713redis_pipeline2(Redis, PipeLine) :-
714 redis_stream(Redis, S, true),
715 redis_pipeline3(Redis, S, PipeLine).
716
717redis_pipeline3(Redis, S, PipeLine) :-
718 maplist(write_pipeline(S), PipeLine),
719 flush_output(S),
720 read_pipeline(Redis, S, PipeLine).
721
722write_pipeline(S, Command -> _Reply) :-
723 !,
724 redis_write_msg_no_flush(S, Command).
725write_pipeline(S, Command) :-
726 redis_write_msg_no_flush(S, Command).
727
728read_pipeline(Redis, S, PipeLine) :-
729 E = error(Formal,_),
730 catch(read_pipeline2(Redis, S, PipeLine), E, true),
731 ( var(Formal)
732 -> true
733 ; reconnect_error(E)
734 -> redis_disconnect(Redis, [force(true)]),
735 throw(E)
736 ; resync(Redis),
737 throw(E)
738 ).
739
740read_pipeline2(Redis, S, PipeLine) :-
741 maplist(redis_read_msg3(S), PipeLine, Replies, Errors, Pushed),
742 maplist(handle_push(Redis), Pushed),
743 maplist(handle_error, Errors),
744 maplist(bind_reply, PipeLine, Replies).
745
746redis_read_msg3(S, _Command -> ReplyIn, Reply, Error, Push) :-
747 !,
748 redis_read_msg(S, ReplyIn, Reply, Error, Push).
749redis_read_msg3(S, Var, Reply, Error, Push) :-
750 redis_read_msg(S, Var, Reply, Error, Push).
751
752handle_push(Redis, Pushed) :-
753 handle_push_messages(Pushed, Redis).
754handle_error(Error) :-
755 ( var(Error)
756 -> true
757 ; throw(Error)
758 ).
759bind_reply(_Command -> Reply0, Reply) :-
760 !,
761 Reply0 = Reply.
762bind_reply(_Command, _).
763
764
770
771:- meta_predicate recover(+, +, 0). 772
773recover(Error, Redis, Goal) :-
774 Error = error(Formal, _),
775 reconnect_error(Formal),
776 auto_reconnect(Redis),
777 !,
778 debug(redis(recover), '~p: got error ~p; trying to reconnect',
779 [Redis, Error]),
780 redis_disconnect(Redis, [force(true)]),
781 ( wait_to_retry(Redis, Error)
782 -> call(Goal),
783 retractall(failure(Redis, _))
784 ; throw(Error)
785 ).
786recover(Error, _, _) :-
787 throw(Error).
788
789auto_reconnect(redis_connection(_,_,_,Options)) :-
790 !,
791 option(reconnect(true), Options).
792auto_reconnect(Server) :-
793 ground(Server),
794 server(Server, _, Options),
795 option(reconnect(true), Options, true).
796
797reconnect_error(io_error(_Action, _On)).
798reconnect_error(socket_error(_Code, _)).
799reconnect_error(syntax_error(unexpected_eof)).
800reconnect_error(existence_error(stream, _)).
801
808
809:- dynamic failure/2 as volatile. 810
811wait_to_retry(Redis, Error) :-
812 redis_failures(Redis, Failures),
813 setting(max_retry_count, Count),
814 Failures < Count,
815 Failures2 is Failures+1,
816 redis_set_failures(Redis, Failures2),
817 setting(max_retry_wait, MaxWait),
818 Wait is min(MaxWait*100, 1<<Failures)/100.0,
819 debug(redis(recover), ' Sleeping ~p seconds', [Wait]),
820 retry_message_level(Failures, Level),
821 print_message(Level, redis(retry(Redis, Failures, Wait, Error))),
822 sleep(Wait).
823
824redis_failures(redis_connection(_,_,Failures0,_), Failures) :-
825 !,
826 Failures = Failures0.
827redis_failures(Server, Failures) :-
828 atom(Server),
829 ( failure(Server, Failures)
830 -> true
831 ; Failures = 0
832 ).
833
834redis_set_failures(Connection, Count) :-
835 compound(Connection),
836 !,
837 nb_setarg(3, Connection, Count).
838redis_set_failures(Server, Count) :-
839 atom(Server),
840 retractall(failure(Server, _)),
841 asserta(failure(Server, Count)).
842
843retry_message_level(0, warning) :- !.
844retry_message_level(_, silent).
845
846
852
853redis(Req) :-
854 setup_call_cleanup(
855 redis_connect(default, C, []),
856 redis1(C, Req, Out),
857 redis_disconnect(C)),
858 print(Out).
859
865
866redis_write(Redis, Command) :-
867 redis_stream(Redis, S, true),
868 redis_write_msg(S, Command).
869
870redis_read(Redis, Reply) :-
871 redis_stream(Redis, S, true),
872 redis_read_stream(Redis, S, Reply).
873
874
875 878
896
897redis_get_list(Redis, Key, List) :-
898 redis_get_list(Redis, Key, -1, List).
899
900redis_get_list(Redis, Key, Chunk, List) :-
901 redis(Redis, llen(Key), Len),
902 ( Len == status(ok)
903 -> List = []
904 ; ( Chunk >= Len
905 ; Chunk == -1
906 )
907 -> ( Len == 0
908 -> List = []
909 ; End is Len-1,
910 list_range(Redis, Key, 0, End, List)
911 )
912 ; lazy_list(rlist_next(s(Redis,Key,0,Chunk,Len)), List)
913 ).
914
915rlist_next(State, List, Tail) :-
916 State = s(Redis,Key,Offset,Slice,Len),
917 End is min(Len-1, Offset+Slice-1),
918 list_range(Redis, Key, Offset, End, Elems),
919 ( End =:= Len-1
920 -> List = Elems,
921 Tail = []
922 ; Offset2 is Offset+Slice,
923 nb_setarg(3, State, Offset2),
924 append(Elems, Tail, List)
925 ).
926
928
929list_range(DB, Key, Start, Start, [Elem]) :-
930 !,
931 redis(DB, lindex(Key, Start), Elem).
932list_range(DB, Key, Start, End, List) :-
933 !,
934 redis(DB, lrange(Key, Start, End), List).
935
936
937
944
945redis_set_list(Redis, Key, List) :-
946 redis(Redis, del(Key), _),
947 ( List == []
948 -> true
949 ; Term =.. [rpush,Key|List],
950 redis(Redis, Term, _Count)
951 ).
952
953
963
964redis_get_hash(Redis, Key, Dict) :-
965 redis(Redis, hgetall(Key), Dict as dict(auto)).
966
967redis_set_hash(Redis, Key, Dict) :-
968 redis_array_dict(Array, _, Dict),
969 Term =.. [hset,Key|Array],
970 redis(Redis, del(Key), _),
971 redis(Redis, Term, _Count).
972
981
982redis_array_dict(Array, Tag, Dict) :-
983 nonvar(Array),
984 !,
985 array_to_pairs(Array, Pairs),
986 dict_pairs(Dict, Tag, Pairs).
987redis_array_dict(TwoList, Tag, Dict) :-
988 dict_pairs(Dict, Tag, Pairs),
989 pairs_to_array(Pairs, TwoList).
990
991array_to_pairs([], []) :-
992 !.
993array_to_pairs([NameS-Value|T0], [Name-Value|T]) :-
994 !, 995 atom_string(Name, NameS),
996 array_to_pairs(T0, T).
997array_to_pairs([NameS,Value|T0], [Name-Value|T]) :-
998 atom_string(Name, NameS),
999 array_to_pairs(T0, T).
1000
1001pairs_to_array([], []) :-
1002 !.
1003pairs_to_array([Name-Value|T0], [NameS,Value|T]) :-
1004 atom_string(Name, NameS),
1005 pairs_to_array(T0, T).
1006
1028
1029redis_scan(Redis, LazyList, Options) :-
1030 scan_options([match,count,type], Options, Parms),
1031 lazy_list(scan_next(s(scan,Redis,0,Parms)), LazyList).
1032
1033redis_sscan(Redis, Set, LazyList, Options) :-
1034 scan_options([match,count,type], Options, Parms),
1035 lazy_list(scan_next(s(sscan(Set),Redis,0,Parms)), LazyList).
1036
1037redis_hscan(Redis, Hash, LazyList, Options) :-
1038 scan_options([match,count,type], Options, Parms),
1039 lazy_list(scan_next(s(hscan(Hash),Redis,0,Parms)), LazyList).
1040
1041redis_zscan(Redis, Set, LazyList, Options) :-
1042 scan_options([match,count,type], Options, Parms),
1043 lazy_list(scan_next(s(zscan(Set),Redis,0,Parms)), LazyList).
1044
1045scan_options([], _, []).
1046scan_options([H|T0], Options, [H,V|T]) :-
1047 Term =.. [H,V],
1048 option(Term, Options),
1049 !,
1050 scan_options(T0, Options, T).
1051scan_options([_|T0], Options, T) :-
1052 scan_options(T0, Options, T).
1053
1054
1055scan_next(State, List, Tail) :-
1056 State = s(Command,Redis,Cursor,Params),
1057 Command =.. CList,
1058 append(CList, [Cursor|Params], CList2),
1059 Term =.. CList2,
1060 redis(Redis, Term, [NewCursor,Elems0]),
1061 scan_pairs(Command, Elems0, Elems),
1062 ( NewCursor == 0
1063 -> List = Elems,
1064 Tail = []
1065 ; nb_setarg(3, State, NewCursor),
1066 append(Elems, Tail, List)
1067 ).
1068
1069scan_pairs(hscan(_), List, Pairs) :-
1070 !,
1071 scan_pairs(List, Pairs).
1072scan_pairs(zscan(_), List, Pairs) :-
1073 !,
1074 scan_pairs(List, Pairs).
1075scan_pairs(_, List, List).
1076
1077scan_pairs([], []).
1078scan_pairs([Key,Value|T0], [Key-Value|T]) :-
1079 !,
1080 scan_pairs(T0, T).
1081scan_pairs([Key-Value|T0], [Key-Value|T]) :-
1082 scan_pairs(T0, T).
1083
1084
1085 1088
1095
1096redis_current_command(Redis, Command) :-
1097 redis_current_command(Redis, Command, _).
1098
1099redis_current_command(Redis, Command, Properties) :-
1100 nonvar(Command),
1101 !,
1102 redis(Redis, command(info, Command), [[_|Properties]]).
1103redis_current_command(Redis, Command, Properties) :-
1104 redis(Redis, command, Commands),
1105 member([Name|Properties], Commands),
1106 atom_string(Command, Name).
1107
1113
1114redis_property(Redis, Property) :-
1115 redis(Redis, info, String),
1116 info_terms(String, Terms),
1117 member(Property, Terms).
1118
1119info_terms(Info, Pairs) :-
1120 split_string(Info, "\n", "\r\n ", Lines),
1121 convlist(info_line_term, Lines, Pairs).
1122
1123info_line_term(Line, Term) :-
1124 sub_string(Line, B, _, A, :),
1125 !,
1126 sub_atom(Line, 0, B, _, Name),
1127 \+ sub_atom(Name, _, _, 0, '_human'),
1128 sub_string(Line, _, A, 0, ValueS),
1129 ( number_string(Value, ValueS)
1130 -> true
1131 ; Value = ValueS
1132 ),
1133 Term =.. [Name,Value].
1134
1135
1136 1139
1167
1168:- dynamic ( subscription/2, 1169 listening/3 1170 ) as volatile. 1171
1172redis_subscribe(Redis, Spec, Id, Options) :-
1173 atom(Redis),
1174 !,
1175 channels(Spec, Channels),
1176 pubsub_thread_options(ThreadOptions, Options),
1177 thread_create(setup_call_cleanup(
1178 redis_connect(Redis, Conn, [reconnect(true)]),
1179 redis_subscribe1(Redis, Conn, Channels),
1180 redis_disconnect(Conn)),
1181 Thread,
1182 ThreadOptions),
1183 pubsub_id(Thread, Id).
1184redis_subscribe(Redis, Spec, Id, Options) :-
1185 channels(Spec, Channels),
1186 pubsub_thread_options(ThreadOptions, Options),
1187 thread_create(redis_subscribe1(Redis, Redis, Channels),
1188 Thread,
1189 ThreadOptions),
1190 pubsub_id(Thread, Id).
1191
1192pubsub_thread_options(ThreadOptions, Options) :-
1193 merge_options(Options, [detached(true)], ThreadOptions).
1194
1195pubsub_id(Thread, Thread).
1199
1200redis_subscribe1(Redis, Conn, Channels) :-
1201 Error = error(Formal, _),
1202 catch(redis_subscribe2(Redis, Conn, Channels), Error, true),
1203 ( var(Formal)
1204 -> true
1205 ; recover(Error, Conn, redis1(Conn, echo("reconnect"), _)),
1206 thread_self(Me),
1207 pubsub_id(Me, Id),
1208 findall(Channel, subscription(Id, Channel), CurrentChannels),
1209 redis_subscribe1(Redis, Conn, CurrentChannels)
1210 ).
1211
1212redis_subscribe2(Redis, Conn, Channels) :-
1213 redis_subscribe3(Conn, Channels),
1214 redis_listen(Redis, Conn).
1215
1216redis_subscribe3(Conn, Channels) :-
1217 thread_self(Me),
1218 pubsub_id(Me, Id),
1219 prolog_listen(this_thread_exit, pubsub_clean(Id)),
1220 maplist(register_subscription(Id), Channels),
1221 redis_stream(Conn, S, true),
1222 Req =.. [subscribe|Channels],
1223 redis_write_msg(S, Req).
1224
1225pubsub_clean(Id) :-
1226 retractall(listening(Id, _Connection, _Thread)),
1227 retractall(subscription(Id, _Channel)).
1228
1238
1239redis_subscribe(Id, Spec) :-
1240 channels(Spec, Channels),
1241 ( listening(Id, Connection, _Thread)
1242 -> true
1243 ; existence_error(redis_pubsub, Id)
1244 ),
1245 maplist(register_subscription(Id), Channels),
1246 redis_stream(Connection, S, true),
1247 Req =.. [subscribe|Channels],
1248 redis_write_msg(S, Req).
1249
1250redis_unsubscribe(Id, Spec) :-
1251 channels(Spec, Channels),
1252 ( listening(Id, Connection, _Thread)
1253 -> true
1254 ; existence_error(redis_pubsub, Id)
1255 ),
1256 maplist(unregister_subscription(Id), Channels),
1257 redis_stream(Connection, S, true),
1258 Req =.. [unsubscribe|Channels],
1259 redis_write_msg(S, Req).
1260
1264
1265redis_current_subscription(Id, Channels) :-
1266 findall(Id-Channel, subscription(Id, Channel), Pairs),
1267 keysort(Pairs, Sorted),
1268 group_pairs_by_key(Sorted, Grouped),
1269 member(Id-Channels, Grouped).
1270
1271channels(Spec, List) :-
1272 is_list(Spec),
1273 !,
1274 maplist(channel_name, Spec, List).
1275channels(Ch, [Key]) :-
1276 channel_name(Ch, Key).
1277
1278channel_name(Atom, Atom) :-
1279 atom(Atom),
1280 !.
1281channel_name(Key, Atom) :-
1282 phrase(key_parts(Key), Parts),
1283 !,
1284 atomic_list_concat(Parts, :, Atom).
1285channel_name(Key, _) :-
1286 type_error(redis_key, Key).
1287
1288key_parts(Var) -->
1289 { var(Var), !, fail }.
1290key_parts(Atom) -->
1291 { atom(Atom) },
1292 !,
1293 [Atom].
1294key_parts(A:B) -->
1295 key_parts(A),
1296 key_parts(B).
1297
1298
1299
1300
1301register_subscription(Id, Channel) :-
1302 ( subscription(Id, Channel)
1303 -> true
1304 ; assertz(subscription(Id, Channel))
1305 ).
1306
1307unregister_subscription(Id, Channel) :-
1308 retractall(subscription(Id, Channel)).
1309
1310redis_listen(Redis, Conn) :-
1311 thread_self(Me),
1312 pubsub_id(Me, Id),
1313 setup_call_cleanup(
1314 assertz(listening(Id, Conn, Me), Ref),
1315 redis_listen_loop(Redis, Id, Conn),
1316 erase(Ref)).
1317
1318redis_listen_loop(Redis, Id, Conn) :-
1319 redis_stream(Conn, S, true),
1320 ( subscription(Id, _)
1321 -> redis_read_stream(Redis, S, Reply),
1322 redis_broadcast(Redis, Reply),
1323 redis_listen_loop(Redis, Id, Conn)
1324 ; true
1325 ).
1326
1327redis_broadcast(_, [subscribe, _Channel, _N]) :-
1328 !.
1329redis_broadcast(Redis, [message, Channel, Data]) :-
1330 !,
1331 catch(broadcast(redis(Redis, Channel, Data)),
1332 Error,
1333 print_message(error, Error)).
1334redis_broadcast(Redis, Message) :-
1335 assertion((Message = [Type, Channel, _Data],
1336 atom(Type),
1337 atom(Channel))),
1338 debug(redis(warning), '~p: Unknown message while listening: ~p',
1339 [Redis,Message]).
1340
1341
1342 1345
1360
1361redis_read_stream(Redis, SI, Out) :-
1362 E = error(Formal,_),
1363 catch(redis_read_msg(SI, Out, Out0, Error, Push), E, true),
1364 ( var(Formal)
1365 -> handle_push_messages(Push, Redis),
1366 ( var(Error)
1367 -> Out = Out0
1368 ; resync(Redis),
1369 throw(Error)
1370 )
1371 ; redis_disconnect(Redis, [force(true)]),
1372 throw(E)
1373 ).
1374
1375handle_push_messages([], _).
1376handle_push_messages([H|T], Redis) :-
1377 ( catch(handle_push_message(H, Redis), E,
1378 print_message(warning, E))
1379 -> true
1380 ; true
1381 ),
1382 handle_push_messages(T, Redis).
1383
1384handle_push_message(["pubsub"|List], Redis) :-
1385 redis_broadcast(Redis, List).
1389handle_push_message([message|List], Redis) :-
1390 redis_broadcast(Redis, [message|List]).
1391
1392
1399
1400resync(Redis) :-
1401 E = error(Formal,_),
1402 catch(do_resync(Redis), E, true),
1403 ( var(Formal)
1404 -> true
1405 ; redis_disconnect(Redis, [force(true)])
1406 ).
1407
1408do_resync(Redis) :-
1409 A is random(1_000_000_000),
1410 redis_stream(Redis, S, true),
1411 redis_write_msg(S, echo(A)),
1412 catch(call_with_time_limit(0.2, '$redis_resync'(S, A)),
1413 time_limit_exceeded,
1414 throw(error(time_limit_exceeded,_))).
1415
1416
1426
1427
1428
1429 1432
1433:- multifile
1434 prolog:error_message//1,
1435 prolog:message//1. 1436
1437prolog:error_message(redis_error(Code, String)) -->
1438 [ 'REDIS: ~w: ~s'-[Code, String] ].
1439
1440prolog:message(redis(retry(_Redis, _Failures, Wait, Error))) -->
1441 [ 'REDIS: connection error. Retrying in ~2f seconds'-[Wait], nl ],
1442 [ ' '-[] ], '$messages':translate_message(Error)