1/* Part of SWI-Prolog 2 3 Author: Jan Wielemaker and Sean Charles 4 E-mail: jan@swi-prolog.org and <sean at objitsu dot com> 5 WWW: http://www.swi-prolog.org 6 Copyright (c) 2013-2024, Sean Charles 7 SWI-Prolog Solutions b.v. 8 All rights reserved. 9 10 Redistribution and use in source and binary forms, with or without 11 modification, are permitted provided that the following conditions 12 are met: 13 14 1. Redistributions of source code must retain the above copyright 15 notice, this list of conditions and the following disclaimer. 16 17 2. Redistributions in binary form must reproduce the above copyright 18 notice, this list of conditions and the following disclaimer in 19 the documentation and/or other materials provided with the 20 distribution. 21 22 THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 23 "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 24 LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS 25 FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE 26 COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, 27 INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, 28 BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; 29 LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER 30 CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT 31 LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN 32 ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE 33 POSSIBILITY OF SUCH DAMAGE. 34 35 NOTE 36 37 The original code was subject to the MIT licence and written by 38 Sean Charles. Re-licenced to standard SWI-Prolog BSD-2 with 39 permission from Sean Charles. 40*/ 41 42:- module(redis, 43 [ redis_server/3, % +Alias, +Address, +Options 44 redis_connect/1, % -Connection 45 redis_connect/3, % -Connection, +Host, +Port 46 redis_disconnect/1, % +Connection 47 redis_disconnect/2, % +Connection, +Options 48 % Queries 49 redis/1, % +Request 50 redis/2, % +Connection, +Request 51 redis/3, % +Connection, +Request, -Reply 52 % High level queries 53 redis_get_list/3, % +Redis, +Key, -List 54 redis_get_list/4, % +Redis, +Key, +ChunkSize, -List 55 redis_set_list/3, % +Redis, +Key, +List 56 redis_get_hash/3, % +Redis, +Key, -Data:dict 57 redis_set_hash/3, % +Redis, +Key, +Data:dict 58 redis_scan/3, % +Redis, -LazyList, +Options 59 redis_sscan/4, % +Redis, +Set, -LazyList, +Options 60 redis_hscan/4, % +Redis, +Hash, -LazyList, +Options 61 redis_zscan/4, % +Redis, +Set, -LazyList, +Options 62 % Publish/Subscribe 63 redis_subscribe/4, % +Redis, +Channels, -Id, +Options 64 redis_subscribe/2, % +Id, +Channels 65 redis_unsubscribe/2, % +Id, +Channels 66 redis_current_subscription/2, % ?Id,?Channels 67 redis_write/2, % +Redis, +Command 68 redis_read/2, % +Redis, -Reply 69 % Building blocks 70 redis_array_dict/3, % ?Array, ?Tag, ?Dict 71 % Admin stuff 72 redis_property/2, % +Reply, ?Property 73 redis_current_command/2, % +Redis,?Command 74 redis_current_command/3, % +Redis, +Command, -Properties 75 76 sentinel_slave/4 % +ServerId, +Pool, -Slave, +Options 77 ]). 78:- autoload(library(socket), [tcp_connect/3]). 79:- autoload(library(apply), [maplist/2, convlist/3, maplist/3, maplist/5]). 80:- autoload(library(broadcast), [broadcast/1]). 81:- autoload(library(error), 82 [ must_be/2, 83 type_error/2, 84 instantiation_error/1, 85 uninstantiation_error/1, 86 existence_error/2, 87 existence_error/3 88 ]). 89:- autoload(library(lazy_lists), [lazy_list/2]). 90:- autoload(library(lists), [append/3, member/2]). 91:- autoload(library(option), [merge_options/3, option/2, 92 option/3, select_option/4]). 93:- autoload(library(pairs), [group_pairs_by_key/2]). 94:- autoload(library(time), [call_with_time_limit/2]). 95:- use_module(library(debug), [debug/3, assertion/1]). 96:- use_module(library(settings), [setting/4, setting/2]). 97:- if(exists_source(library(ssl))). 98:- autoload(library(ssl), [ssl_context/3, ssl_negotiate/5]). 99:- endif. 100 101:- use_foreign_library(foreign(redis4pl)). 102 103:- setting(max_retry_count, nonneg, 8640, % one day 104 "Max number of retries"). 105:- setting(max_retry_wait, number, 10, 106 "Max time to wait between recovery attempts"). 107:- setting(sentinel_timeout, number, 0.2, 108 "Time to wait for a sentinel"). 109 110:- predicate_options(redis_server/3, 3, 111 [ pass_to(redis:redis_connect/3, 3) 112 ]). 113:- predicate_options(redis_connect/3, 3, 114 [ reconnect(boolean), 115 user(atom), 116 password(atomic), 117 version(between(2,3)) 118 ]). 119:- predicate_options(redis_disconnect/2, 2, 120 [ force(boolean) 121 ]). 122:- predicate_options(redis_scan/3, 3, 123 [ match(atomic), 124 count(nonneg), 125 type(atom) 126 ]). 127% Actually not passing, but the same 128:- predicate_options(redis_sscan/4, 4, [pass_to(redis:redis_scan/3, 3)]). 129:- predicate_options(redis_hscan/4, 4, [pass_to(redis:redis_scan/3, 3)]). 130:- predicate_options(redis_zscan/4, 4, [pass_to(redis:redis_scan/3, 3)]).
151:- dynamic server/3. 152 153:- dynamic ( connection/2, % ServerName, Stream 154 sentinel/2 % Pool, Address 155 ) as volatile.
default
points at localhost:6379
with no connect options. The default
server is used for redis/1 and redis/2 and may be changed using this
predicate. Options are described with redis_connect/3.
Connections established this way are by default automatically
reconnected if the connection is lost for some reason unless a
reconnect(false)
option is specified.
169redis_server(Alias, Address, Options) :- 170 must_be(ground, Alias), 171 retractall(server(Alias, _, _)), 172 asserta(server(Alias, Address, Options)). 173 174server(default, localhost:6379, []).
redis_connect(+Address,
-Connection, +Options)
. redis_connect/1 is equivalent to
redis_connect(localhost:6379, Connection, [])
. Options:
true
, try to reconnect to the service when the connection
seems lost. Default is true
for connections specified using
redis_server/3 and false
for explictly opened connections.version(3)
and password(Password)
are specified, these
are used to authenticate using the HELLO command.3
, the HELLO command is used to upgrade the protocol.cacert
, key
and cert
options.sentinel(MasterName)
to enable contacting a network of Redis servers guarded by a
sentinel network.Instead of using these predicates, redis/2 and redis/3 are normally used with a server name argument registered using redis_server/3. These predicates are meant for creating a temporary paralel connection or using a connection with a blocking call.
230redis_connect(Conn) :- 231 redis_connect(default, Conn, []). 232 233redis_connect(Conn, Host, Port) :- 234 var(Conn), 235 ground(Host), ground(Port), 236 !, % GNU-Prolog compatibility 237 redis_connect(Host:Port, Conn, []). 238redis_connect(Server, Conn, Options) :- 239 atom(Server), 240 !, 241 ( server(Server, Address, DefaultOptions) 242 -> merge_options(Options, DefaultOptions, Options2), 243 do_connect(Server, Address, Conn, [address(Address)|Options2]) 244 ; existence_error(redis_server, Server) 245 ). 246redis_connect(Address, Conn, Options) :- 247 do_connect(Address, Address, Conn, [address(Address)|Options]).
redis_connection(Id, Stream, Failures, Options)
255do_connect(Id, sentinel(Pool), Conn, Options) => 256 sentinel_master(Id, Pool, Conn, Options). 257do_connect(Id, Address0, Conn, Options) => 258 tcp_address(Address0, Address), 259 tcp_connect(Address, Stream0, Options), 260 tls_upgrade(Address, Stream0, Stream, Options), 261 Conn = redis_connection(Id, Stream, 0, Options), 262 hello(Conn, Options). 263 264tcp_address(unix(Path), Path) :- 265 !. % Using an atom is ambiguous 266tcp_address(Address, Address).
tls(true)
is specified.272:- if(current_predicate(ssl_context/3)). 273tls_upgrade(Host:_Port, Raw, Stream, Options) :- 274 option(tls(true), Options), 275 !, 276 must_have_option(cacert(CacertFile), Options), 277 must_have_option(key(KeyFile), Options), 278 must_have_option(cert(CertFile), Options), 279 ssl_context(client, SSL, 280 [ host(Host), 281 certificate_file(CertFile), 282 key_file(KeyFile), 283 cacerts([file(CacertFile)]), 284 cert_verify_hook(tls_verify), 285 close_parent(true) 286 ]), 287 stream_pair(Raw, RawRead, RawWrite), 288 ssl_negotiate(SSL, RawRead, RawWrite, Read, Write), 289 stream_pair(Stream, Read, Write). 290:- endif. 291tls_upgrade(_, Stream, Stream, _). 292 293:- if(current_predicate(ssl_context/3)).
redis-cli
), we accept the
certificate as long as it is signed, not verifying the hostname.301:- public tls_verify/5. 302tls_verify(_SSL, _ProblemCert, _AllCerts, _FirstCert, verified) :- 303 !. 304tls_verify(_SSL, _ProblemCert, _AllCerts, _FirstCert, hostname_mismatch) :- 305 !. 306tls_verify(_SSL, _ProblemCert, _AllCerts, _FirstCert, _Error) :- 307 fail. 308 309:- endif.
315sentinel_master(Id, Pool, Master, Options) :- 316 sentinel_connect(Id, Pool, Conn, Options), 317 setting(sentinel_timeout, TMO), 318 call_cleanup( 319 query_sentinel(Pool, Conn, MasterAddr), 320 redis_disconnect(Conn)), 321 debug(redis(sentinel), 'Sentinel claims master is at ~p', [MasterAddr]), 322 do_connect(Id, MasterAddr, Master, Options), 323 debug(redis(sentinel), 'Connected to claimed master', []), 324 redis(Master, role, Role), 325 ( Role = [master|_Slaves] 326 -> debug(redis(sentinel), 'Verified role at ~p', [MasterAddr]) 327 ; redis_disconnect(Master), 328 debug(redis(sentinel), '~p is not the master: ~p', [MasterAddr, Role]), 329 sleep(TMO), 330 sentinel_master(Id, Pool, Master, Options) 331 ). 332 333sentinel_connect(Id, Pool, Conn, Options) :- 334 must_have_option(sentinels(Sentinels), Options), 335 sentinel_auth(Options, Options1), 336 setting(sentinel_timeout, TMO), 337 ( sentinel(Pool, Sentinel) 338 ; member(Sentinel, Sentinels) 339 ), 340 catch(call_with_time_limit( 341 TMO, 342 do_connect(Id, Sentinel, Conn, 343 [sentinel(true)|Options1])), 344 Error, 345 (print_message(warning, Error),fail)), 346 !, 347 debug(redis(sentinel), 'Connected to sentinel at ~p', [Sentinel]), 348 redis(Conn, sentinel(sentinels, Pool), Peers), 349 transaction(update_known_sentinels(Pool, Sentinel, Peers)). 350 351sentinel_auth(Options0, Options) :- 352 option(sentinel_user(User), Options0), 353 option(sentinel_password(Passwd), Options0), 354 !, 355 merge_options([user(User), password(Passwd)], Options0, Options). 356sentinel_auth(Options0, Options) :- 357 select_option(password(_), Options0, Options, _). 358 359 360query_sentinel(Pool, Conn, Host:Port) :- 361 redis(Conn, sentinel('get-master-addr-by-name', Pool), MasterData), 362 MasterData = [Host,Port]. 363 364update_known_sentinels(Pool, Sentinel, Peers) :- 365 retractall(sentinel(Pool, _)), 366 maplist(update_peer_sentinel(Pool), Peers), 367 asserta(sentinel(Pool, Sentinel)). 368 369update_peer_sentinel(Pool, Attrs), 370 memberchk(ip-Host, Attrs), 371 memberchk(port-Port, Attrs) => 372 asserta(sentinel(Pool, Host:Port)). 373 374must_have_option(Opt, Options) :- 375 option(Opt, Options), 376 !. 377must_have_option(Opt, Options) :- 378 existence_error(option, Opt, Options).
SENTINEL SLAVES mastername
387sentinel_slave(ServerId, Pool, Slave, Options) :-
388 sentinel_connect(ServerId, Pool, Conn, Options),
389 redis(Conn, sentinel(slaves, Pool), Slaves),
390 member(Pairs, Slaves),
391 dict_create(Slave, redis, Pairs).
398hello(Con, Options) :- 399 option(version(V), Options), 400 V >= 3, 401 !, 402 ( option(user(User), Options), 403 option(password(Password), Options) 404 -> redis(Con, hello(3, auth, User, Password)) 405 ; redis(Con, hello(3)) 406 ). 407hello(Con, Options) :- 408 option(password(Password), Options), 409 !, 410 redis(Con, auth(Password)). 411hello(_, _).
redis_connection(Id,Stream,Failures,Options)
. If the stream is
disconnected it will be reconnected.420redis_stream(Var, S, _) :- 421 ( var(Var) 422 -> !, instantiation_error(Var) 423 ; nonvar(S) 424 -> !, uninstantiation_error(S) 425 ). 426redis_stream(ServerName, S, Connect) :- 427 atom(ServerName), 428 !, 429 ( connection(ServerName, S0) 430 -> S = S0 431 ; Connect == true, 432 server(ServerName, Address, Options) 433 -> redis_connect(Address, Connection, Options), 434 redis_stream(Connection, S, false), 435 asserta(connection(ServerName, S)) 436 ; existence_error(redis_server, ServerName) 437 ). 438redis_stream(redis_connection(_,S0,_,_), S, _) :- 439 S0 \== (-), 440 !, 441 S = S0. 442redis_stream(Redis, S, _) :- 443 Redis = redis_connection(Id,-,_,Options), 444 option(address(Address), Options), 445 do_connect(Id,Address,Redis2,Options), 446 arg(2, Redis2, S0), 447 nb_setarg(2, Redis, S0), 448 S = S0. 449 450has_redis_stream(Var, _) :- 451 var(Var), 452 !, 453 instantiation_error(Var). 454has_redis_stream(Alias, S) :- 455 atom(Alias), 456 !, 457 connection(Alias, S). 458has_redis_stream(redis_connection(_,S,_,_), S) :- 459 S \== (-).
true
(default false
), do not raise any errors if
Connection does not exist or closing the connection raises
a network or I/O related exception. This version is used
internally if a connection is in a broken state, either due
to a protocol error or a network issue.475redis_disconnect(Redis) :- 476 redis_disconnect(Redis, []). 477 478redis_disconnect(Redis, Options) :- 479 option(force(true), Options), 480 !, 481 ( Redis = redis_connection(_Id, S, _, _Opts) 482 -> ( S == (-) 483 -> true 484 ; close(S, [force(true)]), 485 nb_setarg(2, Redis, -) 486 ) 487 ; has_redis_stream(Redis, S) 488 -> close(S, [force(true)]), 489 retractall(connection(_,S)) 490 ; true 491 ). 492redis_disconnect(Redis, _Options) :- 493 redis_stream(Redis, S, false), 494 close(S), 495 retractall(connection(_,S)).
redis(Connection, Command, _)
and second, it
can be used to exploit Redis pipelines and transactions. The
second form is acticated if Request is a list. In that case, each
element of the list is either a term Command -> Reply
or a simple
Command. Semantically this represents a sequence of redis/3 and
redis/2 calls. It differs in the following aspects:
multi
and the last exec
, the
commands are executed as a Redis transaction, i.e., they
are executed atomically.Procedurally, the process takes the following steps:
Command -> Reply
terms.Examples
?- redis(default, [ lpush(li,1), lpush(li,2), lrange(li,0,-1) -> List ]). List = ["2", "1"].
539redis(Redis, PipeLine) :- 540 is_list(PipeLine), 541 !, 542 redis_pipeline(Redis, PipeLine). 543redis(Redis, Req) :- 544 redis(Redis, Req, _).
"A:B:..."
. This is a common shorthand for
representing Redis keys.
Reply is either a plain term (often a variable) or a term Value as
Type
. In the latter form, Type dictates how the Redis bulk
reply is translated to Prolog. The default equals to auto
, i.e.,
as a number of the content satisfies the Prolog number syntax and
as an atom otherwise.
status(Atom)
Returned if the server replies with + Status
. Atom
is the textual value of Status converted to lower case,
e.g., status(ok)
or status(pong)
.nil
This atom is returned for a NIL/NULL value. Note that if
the reply is only nil
, redis/3 fails. The nil
value
may be embedded inside lists or maps.nil
. If Reply
as a whole would be nil
the call fails.
Redis bulk replies are translated depending on the as
Type as
explained above.
bytes
(iso_latin_1
), utf8
and text
(the
current locale translation).type_error(Type, String)
is raised.min_tagged_integer
and max_tagged_integer
, allowing
the value to be used as a dict key.auto(atom, number)
auto(atom,tagged_integer)
. This allows the value
to be used as a key for a SWI-Prolog dict.pairs
type
can also be applied to a Redis array. In this case the array
length must be even. This notably allows fetching a Redis
hash as pairs using HGETALL
using version 2 of the
Redis protocol.pairs(AsKey, AsValue)
, but convert the resulting
pair list into a SWI-Prolog dict. AsKey must convert to a
valid dict key, i.e., an atom or tagged integer. See dict_key
.dict(dict_key, AsValue)
.Here are some simple examples
?- redis(default, set(a, 42), X). X = status("OK"). ?- redis(default, get(a), X). X = "42". ?- redis(default, get(a), X as integer). X = 42. ?- redis(default, get(a), X as float). X = 42.0. ?- redis(default, set(swipl:version, 8)). true. ?- redis(default, incr(swipl:version), X). X = 9.
664redis(Redis, Req, Out) :- 665 out_val(Out, Val), 666 redis1(Redis, Req, Out), 667 Val \== nil. 668 669out_val(Out, Val) :- 670 ( nonvar(Out), 671 Out = (Val as _) 672 -> true 673 ; Val = Out 674 ). 675 676redis1(Redis, Req, Out) :- 677 Error = error(Formal, _), 678 catch(redis2(Redis, Req, Out), Error, true), 679 ( var(Formal) 680 -> true 681 ; recover(Error, Redis, redis1(Redis, Req, Out)) 682 ). 683 684redis2(Redis, Req, Out) :- 685 atom(Redis), 686 !, 687 redis_stream(Redis, S, true), 688 with_mutex(Redis, 689 ( redis_write_msg(S, Req), 690 redis_read_stream(Redis, S, Out) 691 )). 692redis2(Redis, Req, Out) :- 693 redis_stream(Redis, S, true), 694 redis_write_msg(S, Req), 695 redis_read_stream(Redis, S, Out).
699redis_pipeline(Redis, PipeLine) :- 700 Error = error(Formal, _), 701 catch(redis_pipeline2(Redis, PipeLine), Error, true), 702 ( var(Formal) 703 -> true 704 ; recover(Error, Redis, redis_pipeline(Redis, PipeLine)) 705 ). 706 707redis_pipeline2(Redis, PipeLine) :- 708 atom(Redis), 709 !, 710 redis_stream(Redis, S, true), 711 with_mutex(Redis, 712 redis_pipeline3(Redis, S, PipeLine)). 713redis_pipeline2(Redis, PipeLine) :- 714 redis_stream(Redis, S, true), 715 redis_pipeline3(Redis, S, PipeLine). 716 717redis_pipeline3(Redis, S, PipeLine) :- 718 maplist(write_pipeline(S), PipeLine), 719 flush_output(S), 720 read_pipeline(Redis, S, PipeLine). 721 722write_pipeline(S, Command -> _Reply) :- 723 !, 724 redis_write_msg_no_flush(S, Command). 725write_pipeline(S, Command) :- 726 redis_write_msg_no_flush(S, Command). 727 728read_pipeline(Redis, S, PipeLine) :- 729 E = error(Formal,_), 730 catch(read_pipeline2(Redis, S, PipeLine), E, true), 731 ( var(Formal) 732 -> true 733 ; reconnect_error(E) 734 -> redis_disconnect(Redis, [force(true)]), 735 throw(E) 736 ; resync(Redis), 737 throw(E) 738 ). 739 740read_pipeline2(Redis, S, PipeLine) :- 741 maplist(redis_read_msg3(S), PipeLine, Replies, Errors, Pushed), 742 maplist(handle_push(Redis), Pushed), 743 maplist(handle_error, Errors), 744 maplist(bind_reply, PipeLine, Replies). 745 746redis_read_msg3(S, _Command -> ReplyIn, Reply, Error, Push) :- 747 !, 748 redis_read_msg(S, ReplyIn, Reply, Error, Push). 749redis_read_msg3(S, Var, Reply, Error, Push) :- 750 redis_read_msg(S, Var, Reply, Error, Push). 751 752handle_push(Redis, Pushed) :- 753 handle_push_messages(Pushed, Redis). 754handle_error(Error) :- 755 ( var(Error) 756 -> true 757 ; throw(Error) 758 ). 759bind_reply(_Command -> Reply0, Reply) :- 760 !, 761 Reply0 = Reply. 762bind_reply(_Command, _).
771:- meta_predicate recover( , , ). 772 773recover(Error, Redis, Goal) :- 774 Error = error(Formal, _), 775 reconnect_error(Formal), 776 auto_reconnect(Redis), 777 !, 778 debug(redis(recover), '~p: got error ~p; trying to reconnect', 779 [Redis, Error]), 780 redis_disconnect(Redis, [force(true)]), 781 ( wait_to_retry(Redis, Error) 782 -> call(Goal), 783 retractall(failure(Redis, _)) 784 ; throw(Error) 785 ). 786recover(Error, _, _) :- 787 throw(Error). 788 789auto_reconnect(redis_connection(_,_,_,Options)) :- 790 !, 791 option(reconnect(true), Options). 792auto_reconnect(Server) :- 793 ground(Server), 794 server(Server, _, Options), 795 option(reconnect(true), Options, true). 796 797reconnect_error(io_error(_Action, _On)). 798reconnect_error(socket_error(_Code, _)). 799reconnect_error(syntax_error(unexpected_eof)). 800reconnect_error(existence_error(stream, _)).
max_retry_wait
. If the
setting max_retry_count
is exceeded we fail and the called signals
an exception.809:- dynamic failure/2 as volatile. 810 811wait_to_retry(Redis, Error) :- 812 redis_failures(Redis, Failures), 813 setting(max_retry_count, Count), 814 Failures < Count, 815 Failures2 is Failures+1, 816 redis_set_failures(Redis, Failures2), 817 setting(max_retry_wait, MaxWait), 818 Wait is min(MaxWait*100, 1<<Failures)/100.0, 819 debug(redis(recover), ' Sleeping ~p seconds', [Wait]), 820 retry_message_level(Failures, Level), 821 print_message(Level, redis(retry(Redis, Failures, Wait, Error))), 822 sleep(Wait). 823 824redis_failures(redis_connection(_,_,Failures0,_), Failures) :- 825 !, 826 Failures = Failures0. 827redis_failures(Server, Failures) :- 828 atom(Server), 829 ( failure(Server, Failures) 830 -> true 831 ; Failures = 0 832 ). 833 834redis_set_failures(Connection, Count) :- 835 compound(Connection), 836 !, 837 nb_setarg(3, Connection, Count). 838redis_set_failures(Server, Count) :- 839 atom(Server), 840 retractall(failure(Server, _)), 841 asserta(failure(Server, Count)). 842 843retry_message_level(0, warning) :- !. 844retry_message_level(_, silent).
853redis(Req) :-
854 setup_call_cleanup(
855 redis_connect(default, C, []),
856 redis1(C, Req, Out),
857 redis_disconnect(C)),
858 print(Out).
866redis_write(Redis, Command) :- 867 redis_stream(Redis, S, true), 868 redis_write_msg(S, Command). 869 870redis_read(Redis, Reply) :- 871 redis_stream(Redis, S, true), 872 redis_read_stream(Redis, S, Reply). 873 874 875 /******************************* 876 * HIGH LEVEL ACCESS * 877 *******************************/
LRANGE
requests. Note
that this results in O(N^2) complexity. Using a lazy list is most
useful for relatively short lists holding possibly large items.
Note that values retrieved are strings, unless the value was added
using Term as prolog
.
It seems possible for LLEN
to return OK
. I don't know why.
As a work-around we return the empty list rather than an error.
897redis_get_list(Redis, Key, List) :- 898 redis_get_list(Redis, Key, -1, List). 899 900redis_get_list(Redis, Key, Chunk, List) :- 901 redis(Redis, llen(Key), Len), 902 ( Len == status(ok) 903 -> List = [] 904 ; ( Chunk >= Len 905 ; Chunk == -1 906 ) 907 -> ( Len == 0 908 -> List = [] 909 ; End is Len-1, 910 list_range(Redis, Key, 0, End, List) 911 ) 912 ; lazy_list(rlist_next(s(Redis,Key,0,Chunk,Len)), List) 913 ). 914 915rlist_next(State, List, Tail) :- 916 State = s(Redis,Key,Offset,Slice,Len), 917 End is min(Len-1, Offset+Slice-1), 918 list_range(Redis, Key, Offset, End, Elems), 919 ( End =:= Len-1 920 -> List = Elems, 921 Tail = [] 922 ; Offset2 is Offset+Slice, 923 nb_setarg(3, State, Offset2), 924 append(Elems, Tail, List) 925 ). 926 927% Redis LRANGE demands End > Start and returns inclusive. 928 929list_range(DB, Key, Start, Start, [Elem]) :- 930 !, 931 redis(DB, lindex(Key, Start), Elem). 932list_range(DB, Key, Start, End, List) :- 933 !, 934 redis(DB, lrange(Key, Start, End), List).
[]
, Key is deleted. Note that key values
are always strings in Redis. The same conversion rules as for
redis/1-3 apply.
945redis_set_list(Redis, Key, List) :-
946 redis(Redis, del(Key), _),
947 ( List == []
948 -> true
949 ; Term =.. [rpush,Key|List],
950 redis(Redis, Term, _Count)
951 ).
HGETALL
command. If the Redis hash is not used by
other (non-Prolog) applications one may also consider using the
Term as prolog
syntax to store the Prolog dict as-is.964redis_get_hash(Redis, Key, Dict) :- 965 redis(Redis, hgetall(Key), Dict as dict(auto)). 966 967redis_set_hash(Redis, Key, Dict) :- 968 redis_array_dict(Array, _, Dict), 969 Term =.. [hset,Key|Array], 970 redis(Redis, del(Key), _), 971 redis(Redis, Term, _Count).
982redis_array_dict(Array, Tag, Dict) :- 983 nonvar(Array), 984 !, 985 array_to_pairs(Array, Pairs), 986 dict_pairs(Dict, Tag, Pairs). 987redis_array_dict(TwoList, Tag, Dict) :- 988 dict_pairs(Dict, Tag, Pairs), 989 pairs_to_array(Pairs, TwoList). 990 991array_to_pairs([], []) :- 992 !. 993array_to_pairs([NameS-Value|T0], [Name-Value|T]) :- 994 !, % RESP3 returns a map as pairs. 995 atom_string(Name, NameS), 996 array_to_pairs(T0, T). 997array_to_pairs([NameS,Value|T0], [Name-Value|T]) :- 998 atom_string(Name, NameS), 999 array_to_pairs(T0, T). 1000 1001pairs_to_array([], []) :- 1002 !. 1003pairs_to_array([Name-Value|T0], [NameS,Value|T]) :- 1004 atom_string(Name, NameS), 1005 pairs_to_array(T0, T).
SCAN
, SSCAN
, HSCAN
and ZSCAN` commands
into a lazy list. For redis_scan/3 and redis_sscan/4 the result is
a list of strings. For redis_hscan/4 and redis_zscan/4, the result
is a list of pairs. Options processed:
MATCH
subcommand, only returning matches for
Pattern.COUNT
subcommand, giving a hint to the size of the
chunks fetched.TYPE
subcommand, only returning answers of the
indicated type.1029redis_scan(Redis, LazyList, Options) :- 1030 scan_options([match,count,type], Options, Parms), 1031 lazy_list(scan_next(s(scan,Redis,0,Parms)), LazyList). 1032 1033redis_sscan(Redis, Set, LazyList, Options) :- 1034 scan_options([match,count,type], Options, Parms), 1035 lazy_list(scan_next(s(sscan(Set),Redis,0,Parms)), LazyList). 1036 1037redis_hscan(Redis, Hash, LazyList, Options) :- 1038 scan_options([match,count,type], Options, Parms), 1039 lazy_list(scan_next(s(hscan(Hash),Redis,0,Parms)), LazyList). 1040 1041redis_zscan(Redis, Set, LazyList, Options) :- 1042 scan_options([match,count,type], Options, Parms), 1043 lazy_list(scan_next(s(zscan(Set),Redis,0,Parms)), LazyList). 1044 1045scan_options([], _, []). 1046scan_options([H|T0], Options, [H,V|T]) :- 1047 Term =.. [H,V], 1048 option(Term, Options), 1049 !, 1050 scan_options(T0, Options, T). 1051scan_options([_|T0], Options, T) :- 1052 scan_options(T0, Options, T). 1053 1054 1055scan_next(State, List, Tail) :- 1056 State = s(Command,Redis,Cursor,Params), 1057 Command =.. CList, 1058 append(CList, [Cursor|Params], CList2), 1059 Term =.. CList2, 1060 redis(Redis, Term, [NewCursor,Elems0]), 1061 scan_pairs(Command, Elems0, Elems), 1062 ( NewCursor == 0 1063 -> List = Elems, 1064 Tail = [] 1065 ; nb_setarg(3, State, NewCursor), 1066 append(Elems, Tail, List) 1067 ). 1068 1069scan_pairs(hscan(_), List, Pairs) :- 1070 !, 1071 scan_pairs(List, Pairs). 1072scan_pairs(zscan(_), List, Pairs) :- 1073 !, 1074 scan_pairs(List, Pairs). 1075scan_pairs(_, List, List). 1076 1077scan_pairs([], []). 1078scan_pairs([Key,Value|T0], [Key-Value|T]) :- 1079 !, 1080 scan_pairs(T0, T). 1081scan_pairs([Key-Value|T0], [Key-Value|T]) :- 1082 scan_pairs(T0, T). 1083 1084 1085 /******************************* 1086 * ABOUT * 1087 *******************************/
1096redis_current_command(Redis, Command) :- 1097 redis_current_command(Redis, Command, _). 1098 1099redis_current_command(Redis, Command, Properties) :- 1100 nonvar(Command), 1101 !, 1102 redis(Redis, command(info, Command), [[_|Properties]]). 1103redis_current_command(Redis, Command, Properties) :- 1104 redis(Redis, command, Commands), 1105 member([Name|Properties], Commands), 1106 atom_string(Command, Name).
redis(info, String)
and parses the result. As this is for machine
usage, properties names *_human are skipped.1114redis_property(Redis, Property) :- 1115 redis(Redis, info, String), 1116 info_terms(String, Terms), 1117 member(Property, Terms). 1118 1119info_terms(Info, Pairs) :- 1120 split_string(Info, "\n", "\r\n ", Lines), 1121 convlist(info_line_term, Lines, Pairs). 1122 1123info_line_term(Line, Term) :- 1124 sub_string(Line, B, _, A, :), 1125 !, 1126 sub_atom(Line, 0, B, _, Name), 1127 \+ sub_atom(Name, _, _, 0, '_human'), 1128 sub_string(Line, _, A, 0, ValueS), 1129 ( number_string(Value, ValueS) 1130 -> true 1131 ; Value = ValueS 1132 ), 1133 Term =.. [Name,Value]. 1134 1135 1136 /******************************* 1137 * SUBSCRIBE * 1138 *******************************/
redis(Id, Channel, Data)
If redis_unsubscribe/2 removes the last subscription, the thread terminates.
To simply print the incomming messages use e.g.
?- listen(redis(_, Channel, Data), format('Channel ~p got ~p~n', [Channel,Data])). true. ?- redis_subscribe(default, test, Id, []). Id = redis_pubsub_3, ?- redis(publish(test, "Hello world")). Channel test got "Hello world" 1 true.
1168:- dynamic ( subscription/2, % Id, Channel 1169 listening/3 % Id, Connection, Thread 1170 ) as volatile. 1171 1172redis_subscribe(Redis, Spec, Id, Options) :- 1173 atom(Redis), 1174 !, 1175 channels(Spec, Channels), 1176 pubsub_thread_options(ThreadOptions, Options), 1177 thread_create(setup_call_cleanup( 1178 redis_connect(Redis, Conn, [reconnect(true)]), 1179 redis_subscribe1(Redis, Conn, Channels), 1180 redis_disconnect(Conn)), 1181 Thread, 1182 ThreadOptions), 1183 pubsub_id(Thread, Id). 1184redis_subscribe(Redis, Spec, Id, Options) :- 1185 channels(Spec, Channels), 1186 pubsub_thread_options(ThreadOptions, Options), 1187 thread_create(redis_subscribe1(Redis, Redis, Channels), 1188 Thread, 1189 ThreadOptions), 1190 pubsub_id(Thread, Id). 1191 1192pubsub_thread_options(ThreadOptions, Options) :- 1193 merge_options(Options, [detached(true)], ThreadOptions). 1194 1195pubsub_id(Thread, Thread). 1196%pubsub_id(Thread, Id) :- 1197% thread_property(Thread, id(TID)), 1198% atom_concat('redis_pubsub_', TID, Id). 1199 1200redis_subscribe1(Redis, Conn, Channels) :- 1201 Error = error(Formal, _), 1202 catch(redis_subscribe2(Redis, Conn, Channels), Error, true), 1203 ( var(Formal) 1204 -> true 1205 ; recover(Error, Conn, redis1(Conn, echo("reconnect"), _)), 1206 thread_self(Me), 1207 pubsub_id(Me, Id), 1208 findall(Channel, subscription(Id, Channel), CurrentChannels), 1209 redis_subscribe1(Redis, Conn, CurrentChannels) 1210 ). 1211 1212redis_subscribe2(Redis, Conn, Channels) :- 1213 redis_subscribe3(Conn, Channels), 1214 redis_listen(Redis, Conn). 1215 1216redis_subscribe3(Conn, Channels) :- 1217 thread_self(Me), 1218 pubsub_id(Me, Id), 1219 prolog_listen(this_thread_exit, pubsub_clean(Id)), 1220 maplist(register_subscription(Id), Channels), 1221 redis_stream(Conn, S, true), 1222 Req =.. [subscribe|Channels], 1223 redis_write_msg(S, Req). 1224 1225pubsub_clean(Id) :- 1226 retractall(listening(Id, _Connection, _Thread)), 1227 retractall(subscription(Id, _Channel)).
1239redis_subscribe(Id, Spec) :- 1240 channels(Spec, Channels), 1241 ( listening(Id, Connection, _Thread) 1242 -> true 1243 ; existence_error(redis_pubsub, Id) 1244 ), 1245 maplist(register_subscription(Id), Channels), 1246 redis_stream(Connection, S, true), 1247 Req =.. [subscribe|Channels], 1248 redis_write_msg(S, Req). 1249 1250redis_unsubscribe(Id, Spec) :- 1251 channels(Spec, Channels), 1252 ( listening(Id, Connection, _Thread) 1253 -> true 1254 ; existence_error(redis_pubsub, Id) 1255 ), 1256 maplist(unregister_subscription(Id), Channels), 1257 redis_stream(Connection, S, true), 1258 Req =.. [unsubscribe|Channels], 1259 redis_write_msg(S, Req).
1265redis_current_subscription(Id, Channels) :- 1266 findall(Id-Channel, subscription(Id, Channel), Pairs), 1267 keysort(Pairs, Sorted), 1268 group_pairs_by_key(Sorted, Grouped), 1269 member(Id-Channels, Grouped). 1270 1271channels(Spec, List) :- 1272 is_list(Spec), 1273 !, 1274 maplist(channel_name, Spec, List). 1275channels(Ch, [Key]) :- 1276 channel_name(Ch, Key). 1277 1278channel_name(Atom, Atom) :- 1279 atom(Atom), 1280 !. 1281channel_name(Key, Atom) :- 1282 phrase(key_parts(Key), Parts), 1283 !, 1284 atomic_list_concat(Parts, :, Atom). 1285channel_name(Key, _) :- 1286 type_error(redis_key, Key). 1287 1288key_parts(Var) --> 1289 { var(Var), !, fail }. 1290key_parts(Atom) --> 1291 { atom(Atom) }, 1292 !, 1293 [Atom]. 1294key_parts(A:B) --> 1295 key_parts(A), 1296 key_parts(B). 1297 1298 1299 1300 1301register_subscription(Id, Channel) :- 1302 ( subscription(Id, Channel) 1303 -> true 1304 ; assertz(subscription(Id, Channel)) 1305 ). 1306 1307unregister_subscription(Id, Channel) :- 1308 retractall(subscription(Id, Channel)). 1309 1310redis_listen(Redis, Conn) :- 1311 thread_self(Me), 1312 pubsub_id(Me, Id), 1313 setup_call_cleanup( 1314 assertz(listening(Id, Conn, Me), Ref), 1315 redis_listen_loop(Redis, Id, Conn), 1316 erase(Ref)). 1317 1318redis_listen_loop(Redis, Id, Conn) :- 1319 redis_stream(Conn, S, true), 1320 ( subscription(Id, _) 1321 -> redis_read_stream(Redis, S, Reply), 1322 redis_broadcast(Redis, Reply), 1323 redis_listen_loop(Redis, Id, Conn) 1324 ; true 1325 ). 1326 1327redis_broadcast(_, [subscribe, _Channel, _N]) :- 1328 !. 1329redis_broadcast(Redis, [message, Channel, Data]) :- 1330 !, 1331 catch(broadcast(redis(Redis, Channel, Data)), 1332 Error, 1333 print_message(error, Error)). 1334redis_broadcast(Redis, Message) :- 1335 assertion((Message = [Type, Channel, _Data], 1336 atom(Type), 1337 atom(Channel))), 1338 debug(redis(warning), '~p: Unknown message while listening: ~p', 1339 [Redis,Message]). 1340 1341 1342 /******************************* 1343 * READ/WRITE * 1344 *******************************/
nil
status(String)
true
or false
). RESP3 only.If something goes wrong, the connection is closed and an exception is raised.
1361redis_read_stream(Redis, SI, Out) :- 1362 E = error(Formal,_), 1363 catch(redis_read_msg(SI, Out, Out0, Error, Push), E, true), 1364 ( var(Formal) 1365 -> handle_push_messages(Push, Redis), 1366 ( var(Error) 1367 -> Out = Out0 1368 ; resync(Redis), 1369 throw(Error) 1370 ) 1371 ; redis_disconnect(Redis, [force(true)]), 1372 throw(E) 1373 ). 1374 1375handle_push_messages([], _). 1376handle_push_messages([H|T], Redis) :- 1377 ( catch(handle_push_message(H, Redis), E, 1378 print_message(warning, E)) 1379 -> true 1380 ; true 1381 ), 1382 handle_push_messages(T, Redis). 1383 1384handle_push_message(["pubsub"|List], Redis) :- 1385 redis_broadcast(Redis, List). 1386% some protocol version 3 push messages (such as 1387% __keyspace@* events) seem to come directly 1388% without a pubsub header 1389handle_push_message([message|List], Redis) :- 1390 redis_broadcast(Redis, [message|List]).
1400resync(Redis) :- 1401 E = error(Formal,_), 1402 catch(do_resync(Redis), E, true), 1403 ( var(Formal) 1404 -> true 1405 ; redis_disconnect(Redis, [force(true)]) 1406 ). 1407 1408do_resync(Redis) :- 1409 A is random(1_000_000_000), 1410 redis_stream(Redis, S, true), 1411 redis_write_msg(S, echo(A)), 1412 catch(call_with_time_limit(0.2, '$redis_resync'(S, A)), 1413 time_limit_exceeded, 1414 throw(error(time_limit_exceeded,_))).
redis4pl
.
1429 /******************************* 1430 * MESSAGES * 1431 *******************************/ 1432 1433:- multifile 1434 prolog:error_message//1, 1435 prolog:message//1. 1436 1437prologerror_message(redis_error(Code, String)) --> 1438 [ 'REDIS: ~w: ~s'-[Code, String] ]. 1439 1440prologmessage(redis(retry(_Redis, _Failures, Wait, Error))) --> 1441 [ 'REDIS: connection error. Retrying in ~2f seconds'-[Wait], nl ], 1442 [ ' '-[] ], '$messages':translate_message(Error)
Redis client
This library is a client to Redis, a popular key value store to deal with caching and communication between micro services.
In the typical use case we register the details of one or more Redis servers using redis_server/3. Subsequenly, redis/2-3 is used to issue commands on the server. For example:
*/