1/* Part of SWI-Prolog 2 3 Author: Jeffrey Rosenwald, Jan Wielemaker 4 E-mail: jeffrose@acm.org 5 WWW: http://www.swi-prolog.org 6 Copyright (c) 2009-2019, Jeffrey Rosenwald 7 CWI, Amsterdam 8 All rights reserved. 9 10 Redistribution and use in source and binary forms, with or without 11 modification, are permitted provided that the following conditions 12 are met: 13 14 1. Redistributions of source code must retain the above copyright 15 notice, this list of conditions and the following disclaimer. 16 17 2. Redistributions in binary form must reproduce the above copyright 18 notice, this list of conditions and the following disclaimer in 19 the documentation and/or other materials provided with the 20 distribution. 21 22 THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 23 "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 24 LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS 25 FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE 26 COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, 27 INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, 28 BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; 29 LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER 30 CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT 31 LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN 32 ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE 33 POSSIBILITY OF SUCH DAMAGE. 34*/ 35 36:- module(paxos, 37 [ paxos_get/1, % ?Term 38 paxos_get/2, % +Key, -Value 39 paxos_get/3, % +Key, -Value, +Options 40 paxos_set/1, % ?Term 41 paxos_set/2, % +Key, +Value 42 paxos_set/3, % +Key, +Value, +Options 43 paxos_on_change/2, % ?Term, +Goal 44 paxos_on_change/3, % ?Key, ?Value, +Goal 45 46 paxos_initialize/1, % +Options 47 48 paxos_admin_key/2, % ?Name, ?Key 49 paxos_property/1, % ?Property 50 paxos_quorum_ask/4, % ?Templ, +Msg, -Result, +Options 51 % Hook support 52 paxos_replicate_key/3 % +Nodes, ?Key, +Options 53 ]). 54:- autoload(library(apply),[partition/4,maplist/3]). 55:- autoload(library(broadcast), 56 [ listen/3, 57 broadcast_request/1, 58 broadcast/1, 59 unlisten/1, 60 listen/2, 61 unlisten/2 62 ]). 63:- use_module(library(debug),[debug/3]). 64:- autoload(library(error), 65 [permission_error/3,resource_error/1,must_be/2]). 66:- autoload(library(lists),[select/3,nth1/3,max_list/2,member/2]). 67:- autoload(library(option),[option/2,option/3]). 68:- autoload(library(solution_sequences),[call_nth/2]). 69:- use_module(library(settings),[setting/4,setting/2]).
141:- meta_predicate 142 paxos_on_change( , ), 143 paxos_on_change( , , ). 144 145:- multifile 146 paxos_message_hook/3, % +PaxOS, +TimeOut, -Message 147 paxos_ledger_hook/5. % +Op, ?Key, ?Gen, ?Value, ?Status 148 149:- setting(max_sets, nonneg, 20, 150 "Max Retries to get to an agreement"). 151:- setting(max_gets, nonneg, 5, 152 "Max Retries to get a value from the forum"). 153:- setting(response_timeout, float, 0.020, 154 "Max time to wait for a response"). 155:- setting(replication_rate, number, 1000, 156 "Number of keys replicated per second"). 157:- setting(death_half_life, number, 10, 158 "Half-time for failure score"). 159:- setting(death_score, number, 100, 160 "Consider a node dead if cummulative failure \c 161 score exceeds this number").
paxos_initialize([])
is executed lazily as part of
the first paxos operation. Defined options:
NodeID must be a small non-negative integer as these identifiers are used in bitmaps.
183:- dynamic paxos_initialized/0. 184:- volatile paxos_initialized/0. 185 186paxos_initialize(_Options) :- 187 paxos_initialized, 188 !. 189paxos_initialize(Options) :- 190 with_mutex(paxos, paxos_initialize_sync(Options)). 191 192paxos_initialize_sync(_Options) :- 193 paxos_initialized, 194 !. 195paxos_initialize_sync(Options) :- 196 at_halt(paxos_leave), 197 listen(paxos, paxos(X), paxos_message(X)), 198 paxos_assign_node(Options), 199 start_replicator, 200 asserta(paxos_initialized). 201 202paxos_initialize :- 203 paxos_initialize([]). 204 205 206 /******************************* 207 * ADMIN * 208 *******************************/
216paxos_admin_key(quorum, '$paxos_quorum'). 217paxos_admin_key(dead, '$paxos_dead_nodes'). 218 219paxos_get_admin(Name, Value) :- 220 paxos_admin_key(Name, Key), 221 paxos_get(Key, Value). 222 223paxos_set_admin(Name, Value) :- 224 paxos_admin_key(Name, Key), 225 paxos_set(Key, Value). 226 227paxos_set_admin_bg(Name, Value) :- 228 thread_create(ignore(paxos_set_admin(Name, Value)), _, 229 [ detached(true) 230 ]). 231 232 233 /******************************* 234 * NODE DATA * 235 *******************************/
255:- dynamic 256 node/1, % NodeID 257 quorum/1, % Bitmap 258 failed/1, % Bitmap 259 failed/3, % NodeID, LastTried, Score 260 leaving/0, % Node is leaving 261 dead/1, % Bitmap 262 salt/1. % Unique key 263:- volatile 264 node/1, 265 quorum/1, 266 failed/1, 267 failed/3, 268 leaving/0, 269 dead/1, 270 salt/1.
278paxos_assign_node(Options) :- 279 ( option(node(Node), Options) 280 -> node(Node) 281 ; node(_) 282 ), % already done 283 !. 284paxos_assign_node(Options) :- 285 between(1, 20, Retry), 286 option(node(Node), Options, Node), 287 ( node(_) 288 -> permission_error(set, paxos_node, Node) 289 ; true 290 ), 291 retractall(dead(_)), 292 retractall(quorum(_)), 293 retractall(failed(_)), 294 retractall(failed(_,_,_)), 295 retractall(leaving), 296 Salt is random(1<<63), 297 asserta(salt(Salt)), 298 paxos_message(node(N,Q,D):From, 0.25, NodeQuery), 299 findall(t(N,Q,D,From), 300 broadcast_request(NodeQuery), 301 Network), 302 select(t(self,0,Salt,Me), Network, AllNodeStatus), 303 partition(starting, AllNodeStatus, Starting, Running), 304 nth_starting(Starting, Salt, Offset), 305 retractall(salt(_)), 306 debug(paxos(node), 'Me@~p; starting: ~p; running: ~p', 307 [Me, Starting, Running]), 308 arg_union(2, Running, Quorum), 309 arg_union(3, Running, Dead), 310 ( var(Node) 311 -> ( call_nth(( between(0, 1000, Node), 312 \+ memberchk(t(Node,_,_,_), Running), 313 Dead /\ (1<<Node) =:= 0), 314 Offset) 315 -> debug(paxos(node), 'Assigning myself node ~d', [Node]) 316 ; resource_error(paxos_nodes) 317 ) 318 ; memberchk(t(Node,_,_,_), Running) 319 -> permission_error(set, paxos_node, Node) 320 ; Rejoin = true 321 ), 322 asserta(node(Node)), 323 ( claim_node(Node, Me) 324 -> !, 325 asserta(dead(Dead)), 326 set_quorum(Node, Quorum), 327 ( Rejoin == true 328 -> paxos_rejoin 329 ; true 330 ) 331 ; debug(paxos(node), 'Node ~p already claimed; retrying (~p)', 332 [Node, Retry]), 333 retractall(node(Node)), 334 fail 335 ). 336 337starting(t(self,_Quorum,_Salt,_Address)). 338 339nth_starting(Starting, Salt, N) :- 340 maplist(arg(3), Starting, Salts), 341 sort([Salt|Salts], Sorted), 342 nth1(N, Sorted, Salt), 343 !. 344 345claim_node(Node, Me) :- 346 paxos_message(claim_node(Node, Ok):From, 0.25, NodeQuery), 347 forall(( broadcast_request(NodeQuery), 348 From \== Me, 349 debug(paxos(node), 'Claim ~p ~p: ~p', [Node, From, Ok]) 350 ), 351 Ok == true). 352 353set_quorum(Node, Quorum0) :- 354 Quorum is Quorum0 \/ (1<<Node), 355 debug(paxos(node), 'Adding ~d to quorum (now 0x~16r)', [Node, Quorum]), 356 asserta(quorum(Quorum)), 357 paxos_set_admin(quorum, Quorum).
367paxos_rejoin :-
368 node(Node),
369 repeat,
370 ( paxos_get_admin(dead, Dead0)
371 -> Dead is Dead0 /\ \(1<<Node),
372 ( Dead == Dead0
373 -> true
374 ; paxos_set_admin(dead, Dead)
375 )
376 ; true
377 ),
378 !.
388paxos_leave :- 389 node(Node), 390 !, 391 asserta(leaving), 392 paxos_leave(Node), 393 Set is 1<<Node, 394 paxos_message(forget(Set), -, Forget), 395 broadcast(Forget), 396 unlisten(paxos), 397 retractall(leaving). 398paxos_leave. 399 400paxos_leave(Node) :- 401 !, 402 paxos_update_set(quorum, del(Node)), 403 paxos_update_set(dead, add(Node)). 404paxos_leave(_). 405 406paxos_update_set(Set, How) :- 407 repeat, 408 Term =.. [Set,Value], 409 call(Term), 410 ( How = add(Node) 411 -> NewValue is Value \/ (1<<Node) 412 ; How = del(Node) 413 -> NewValue is Value /\ \(1<<Node) 414 ), 415 ( Value == NewValue 416 -> true 417 ; paxos_set_admin(Set, NewValue) 418 -> true 419 ; leaving 420 ), 421 !. 422 423 /******************************* 424 * NODE STATUS * 425 *******************************/
435update_failed(Action, Quorum, Alive) :- 436 Failed is Quorum /\ \Alive, 437 alive(Alive), 438 consider_dead(Failed), 439 ( failed(Failed) 440 -> true 441 ; ( clause(failed(_Old), true, Ref) 442 -> asserta(failed(Failed)), 443 erase(Ref), 444 debug(paxos(node), 'Updated failed quorum to 0x~16r', [Failed]) 445 ; asserta(failed(Failed)) 446 ), 447 ( Action == set 448 -> start_replicator 449 ; true 450 ) 451 ). 452 453consider_dead(0) :- 454 !. 455consider_dead(Failed) :- 456 Node is lsb(Failed), 457 consider_dead1(Node), 458 Rest is Failed /\ \(1<<Node), 459 consider_dead(Rest). 460 461consider_dead1(Node) :- 462 clause(failed(Node, Last, Score), true, Ref), 463 !, 464 setting(death_half_life, HalfLife), 465 setting(death_score, DeathScore), 466 get_time(Now), 467 Passed is Now-Last, 468 NewScore is Score*(2**(-Passed/HalfLife)) + 10, 469 asserta(failed(Node, Now, NewScore)), 470 erase(Ref), 471 ( NewScore < DeathScore 472 -> debug(paxos(node), 'Consider node ~d dead', [Node]), 473 paxos_leave(Node) 474 ; true 475 ). 476consider_dead1(Node) :- 477 get_time(Now), 478 asserta(failed(Node, Now, 10)). 479 480alive(Bitmap) :- 481 ( clause(failed(Node, _Last, _Score), true, Ref), 482 Bitmap /\ (1<<Node) =\= 0, 483 erase(Ref), 484 fail 485 ; true 486 ).
497life_quorum(Quorum, LifeQuorum) :- 498 quorum(Quorum), 499 ( failed(Failed), 500 Failed \== 0, 501 LifeQuorum is Quorum /\ \Failed, 502 majority(LifeQuorum, Quorum) 503 -> true 504 ; LifeQuorum = Quorum 505 ). 506 507 508 /******************************* 509 * NETWORK STATUS * 510 *******************************/ 511 512:- paxos_admin_key(quorum, Key), 513 listen(paxos_changed(Key, Quorum), 514 update_quorum(Quorum)). 515:- paxos_admin_key(dead, Key), 516 listen(paxos_changed(Key, Death), 517 update_dead(Death)). 518 519update_quorum(Proposed) :- 520 debug(paxos(node), 'Received quorum proposal 0x~16r', [Proposed]), 521 quorum(Proposed), 522 !. 523update_quorum(Proposed) :- 524 leaving, 525 !, 526 update(quorum(Proposed)). 527update_quorum(Proposed) :- 528 node(Node), 529 Proposed /\ (1<<Node) =\= 0, 530 !, 531 update(quorum(Proposed)). 532update_quorum(Proposed) :- 533 node(Node), 534 NewQuorum is Proposed \/ (1<<Node), 535 update(quorum(NewQuorum)), 536 debug(paxos(node), 'I''m not in the quorum! Proposing 0x~16r', [NewQuorum]), 537 paxos_set_admin_bg(quorum, NewQuorum). 538 539update_dead(Proposed) :- 540 debug(paxos(node), 'Received dead proposal 0x~16r', [Proposed]), 541 dead(Proposed), 542 !. 543update_dead(Proposed) :- 544 leaving, 545 !, 546 update(dead(Proposed)). 547update_dead(Proposed) :- 548 node(Node), 549 Proposed /\ (1<<Node) =:= 0, 550 !, 551 update(dead(Proposed)). 552update_dead(Proposed) :- 553 node(Node), 554 NewDead is Proposed /\ \(1<<Node), 555 update(dead(NewDead)), 556 paxos_set_admin_bg(dead, NewDead). 557 558update(Clause) :- 559 functor(Clause, Name, Arity), 560 functor(Generic, Name, Arity), 561 ( clause(Generic, true, Ref) 562 -> asserta(Clause), 563 erase(Ref) 564 ; asserta(Clause) 565 ).
576paxos_property(node(NodeID)) :- 577 node(NodeID). 578paxos_property(quorum(Quorum)) :- 579 quorum(Quorum). 580paxos_property(failed(Nodes)) :- 581 failed(Nodes). 582 583 584 /******************************* 585 * INBOUND EVENTS * 586 *******************************/
0
for Gen and the
our node id for Node.nack
.623paxos_message(prepare(Key,Node,Gen,Value)) :- 624 node(Node), 625 ( ledger(Key, Gen, _) 626 -> true 627 ; Gen = 0, 628 ledger_create(Key, Gen, Value) 629 ), 630 debug(paxos, 'Prepared ~p-~p@~d', [Key,Value,Gen]). 631paxos_message(accept(Key,Node,Gen,GenA,Value)) :- 632 node(Node), 633 ( ledger_update(Key, Gen, Value) 634 -> debug(paxos, 'Accepted ~p-~p@~d', [Key,Value,Gen]), 635 GenA = Gen 636 ; debug(paxos, 'Rejected ~p-~p@~d', [Key,Value,Gen]), 637 GenA = nack 638 ). 639paxos_message(changed(Key,Gen,Value,Acceptors)) :- 640 debug(paxos, 'Changed ~p-~p@~d for ~p', [Key, Value, Gen, Acceptors]), 641 ledger_update_holders(Key,Gen,Acceptors), 642 broadcast(paxos_changed(Key,Value)). 643paxos_message(learn(Key,Node,Gen,GenA,Value)) :- 644 node(Node), 645 debug(paxos, 'Learn ~p-~p@~p?', [Key, Value, Gen]), 646 ( ledger_learn(Key,Gen,Value) 647 -> debug(paxos, 'Learned ~p-~p@~d', [Key,Value,Gen]), 648 GenA = Gen 649 ; debug(paxos, 'Rejected ~p@~d', [Key, Gen]), 650 GenA = nack 651 ). 652paxos_message(learned(Key,Gen,_Value,Acceptors)) :- 653 ledger_update_holders(Key,Gen,Acceptors). 654paxos_message(retrieve(Key,Node,K,Value)) :- 655 node(Node), 656 debug(paxos, 'Retrieving ~p', [Key]), 657 ledger(Key,K,Value), 658 debug(paxos, 'Retrieved ~p-~p@~d', [Key,Value,K]), 659 !. 660paxos_message(forget(Nodes)) :- 661 ledger_forget(Nodes). 662paxos_message(node(Node,Quorum,Dead)) :- 663 ( node(Node), 664 quorum(Quorum), 665 dead(Dead) 666 -> true 667 ; salt(Salt), 668 Node = self, 669 Quorum = 0, 670 Dead = Salt 671 ). 672paxos_message(claim_node(Node, Ok)) :- 673 ( node(Node) 674 -> Ok = false 675 ; Ok = true 676 ). 677paxos_message(ask(Node, Message)) :- 678 node(Node), 679 broadcast_request(Message). 680 681 682 /******************************* 683 * KEY-VALUE OPERATIONS * 684 *******************************/
paxos_key(Term,Key)
, pasox_set(Key,Term)
. I.e., Term
is a ground compound term and its key is the name/arity pair. This
version provides compatibility with older versions of this library.
On success, paxos_set/1 will also broadcast the term
paxos_changed(Key,Value)
, to the quorum.
Options processed:
max_sets
(20).response_timeout
(0.020, 20ms).722paxos_set(Term) :- 723 paxos_key(Term, Key), 724 paxos_set(Key, Term, []). 725 726paxos_set(Key, Value) :- 727 paxos_set(Key, Value, []). 728 729paxos_set(Key, Value, Options) :- 730 must_be(ground, Key-Value), 731 paxos_initialize, 732 option(retry(Retries), Options, Retries), 733 option(timeout(TMO), Options, TMO), 734 apply_default(Retries, max_sets), 735 apply_default(TMO, response_timeout), 736 paxos_message(prepare(Key,Np,Rp,Value), TMO, Prepare), 737 between(0, Retries, _), 738 life_quorum(Quorum, Alive), 739 Alive \== 0, 740 debug(paxos, 'Set: ~p -> ~p', [Key, Value]), 741 collect(Quorum, false, Np, Rp, Prepare, Rps, PrepNodes), 742 debug(paxos, 'Set: quorum: 0x~16r, prepared by 0x~16r, gens ~p', 743 [Quorum, PrepNodes, Rps]), 744 majority(PrepNodes, Quorum), 745 max_list(Rps, K), 746 succ(K, K1), 747 paxos_message(accept(Key,Na,K1,Ra,Value), TMO, Accept), 748 collect(Alive, Ra == nack, Na, Ra, Accept, Ras, AcceptNodes), 749 majority(AcceptNodes, Quorum), 750 intersecting(PrepNodes, AcceptNodes), 751 c_element(Ras, K, K1), 752 broadcast(paxos(log(Key,Value,AcceptNodes,K1))), 753 paxos_message(changed(Key,K1,Value,AcceptNodes), -, Changed), 754 broadcast(Changed), 755 update_failed(set, Quorum, AcceptNodes), 756 !. 757 758apply_default(Var, Setting) :- 759 var(Var), 760 !, 761 setting(Setting, Var). 762apply_default(_, _). 763 764majority(SubSet, Set) :- 765 popcount(SubSet) >= (popcount(Set)+2)//2. 766 767intersecting(Set1, Set2) :- 768 Set1 /\ Set2 =\= 0.
length(Result)
. The transfer stops if all members of
the set Quorum responded or the configured timeout passed.
781collect(Quorum, Stop, Node, Template, Message, Result, NodeSet) :-
782 State = state(0),
783 L0 = [dummy|_],
784 Answers = list(L0),
785 ( broadcast_request(Message),
786 debug(paxos(request), 'broadcast_request: ~p', [Message]),
787 (
788 -> !,
789 fail
790 ; true
791 ),
792 duplicate_term(Template, Copy),
793 NewLastCell = [Copy|_],
794 arg(1, Answers, LastCell),
795 nb_linkarg(2, LastCell, NewLastCell),
796 nb_linkarg(1, Answers, NewLastCell),
797 arg(1, State, Replied0),
798 Replied is Replied0 \/ (1<<Node),
799 nb_setarg(1, State, Replied),
800 Quorum /\ Replied =:= Quorum
801 -> true
802 ; true
803 ),
804 arg(1, State, NodeSet),
805 arg(1, Answers, [_]), % close the answer list
806 L0 = [_|Result].
response_timeout
.
824paxos_quorum_ask(Template, Message, Result, Options) :-
825 option(timeout(TMO), Options, TMO),
826 option(node(Node), Options, _),
827 option(quorum(Quorum), Options, Quorum),
828 apply_default(TMO, response_timeout),
829 ( var(Quorum)
830 -> life_quorum(Quorum, _Alive)
831 ; true
832 ),
833 paxos_message(ask(Node, Message), TMO, BroadcastMessage),
834 collect(Quorum, false, Node, Template, BroadcastMessage, Result, _PrepNodes).
paxos_key(Term,Key)
, pasox_get(Key,Term)
. I.e., Term
is a compound term and its key is the name/arity pair. This version
provides compatibility with older versions of this library.Options processed:
max_gets
(5).response_timeout
(0.020, 20ms).865paxos_get(Term) :- 866 paxos_key(Term, Key), 867 paxos_get(Key, Term, []). 868paxos_get(Key, Value) :- 869 paxos_get(Key, Value, []). 870 871paxos_get(Key, Value, _) :- 872 ledger(Key, _Line, Value), 873 !. 874paxos_get(Key, Value, Options) :- 875 paxos_initialize, 876 option(retry(Retries), Options, Retries), 877 option(timeout(TMO), Options, TMO), 878 apply_default(Retries, max_gets), 879 apply_default(TMO, response_timeout), 880 Msg = Line-Value, 881 paxos_message(retrieve(Key,Nr,Line,Value), TMO, Retrieve), 882 node(Node), 883 between(0, Retries, _), 884 life_quorum(Quorum, Alive), 885 QuorumA is Alive /\ \(1<<Node), 886 collect(QuorumA, false, Nr, Msg, Retrieve, Terms, RetrievedNodes), 887 debug(paxos, 'Retrieved: ~p from 0x~16r', [Terms, RetrievedNodes]), 888 highest_vote(Terms, _Line-MajorityValue, Count), 889 debug(paxos, 'Best: ~p with ~d votes', [MajorityValue, Count]), 890 Count >= (popcount(QuorumA)+2)//2, 891 debug(paxos, 'Retrieve: accept ~p', [MajorityValue]), 892 update_failed(get, Quorum, RetrievedNodes), 893 paxos_set(Key, MajorityValue), % Is this needed? 894 !. 895 896highest_vote(Terms, Term, Count) :- 897 msort(Terms, Sorted), 898 count_votes(Sorted, Counted), 899 sort(1, >, Counted, [Count-Term|_]). 900 901count_votes([], []). 902count_votes([H|T0], [N-H|T]) :- 903 count_same(H, T0, 1, N, R), 904 count_votes(R, T). 905 906count_same(H, [Hc|T0], C0, C, R) :- 907 H == Hc, 908 !, 909 C1 is C0+1, 910 count_same(H, T0, C1, C, R). 911count_same(_, R, C, C, R).
'$c'(Name,Arity)
. Note that we do not
use Name/Arity
and X/Y
is naturally used to organize keys as
hierachical paths.920paxos_key(Compound, '$c'(Name,Arity)) :- 921 compound(Compound), !, 922 compound_name_arity(Compound, Name, Arity). 923paxos_key(Compound, _) :- 924 must_be(compound, Compound). 925 926 927 /******************************* 928 * REPLICATION * 929 *******************************/
940start_replicator :- 941 catch(thread_send_message(paxos_replicator, run), 942 error(existence_error(_,_),_), 943 fail), 944 !. 945start_replicator :- 946 catch(thread_create(replicator, _, 947 [ alias(paxos_replicator), 948 detached(true) 949 ]), 950 error(permission_error(_,_,_),_), 951 true). 952 953replicator :- 954 setting(replication_rate, ReplRate), 955 ReplSleep is 1/ReplRate, 956 node(Node), 957 debug(paxos(replicate), 'Starting replicator', []), 958 State = state(idle), 959 repeat, 960 quorum(Quorum), 961 dead(Dead), 962 LifeQuorum is Quorum /\ \Dead, 963 ( LifeQuorum /\ \(1<<Node) =:= 0 964 -> debug(paxos(replicate), 965 'Me: ~d, Quorum: 0x~16r, Dead: 0x~16r: I''m alone, waiting ...', 966 [Node, Quorum, Dead]), 967 thread_get_message(_) 968 ; ( paxos_replicate_key(LifeQuorum, Key, []) 969 -> replicated(State, key(Key)), 970 thread_self(Me), 971 thread_get_message(Me, _, [timeout(ReplSleep)]) 972 ; replicated(State, idle), 973 thread_get_message(_) 974 ) 975 ), 976 fail. 977 978replicated(State, key(_Key)) :- 979 arg(1, State, idle), 980 !, 981 debug(paxos(replicate), 'Start replicating ...', []), 982 nb_setarg(1, State, 1). 983replicated(State, key(_Key)) :- 984 !, 985 arg(1, State, C0), 986 C is C0+1, 987 nb_setarg(1, State, C). 988replicated(State, idle) :- 989 arg(1, State, idle), 990 !. 991replicated(State, idle) :- 992 arg(1, State, Count), 993 debug(paxos(replicate), 'Replicated ~D keys', [Count]), 994 nb_setarg(1, State, idle).
response_timeout
(0.020, 20ms).1006paxos_replicate_key(Nodes, Key, Options) :- 1007 replication_key(Nodes, Key), 1008 option(timeout(TMO), Options, TMO), 1009 apply_default(TMO, response_timeout), 1010 ledger_current(Key, Gen, Value, Holders), 1011 paxos_message(learn(Key,Na,Gen,Ga,Value), TMO, Learn), 1012 collect(Nodes, Ga == nack, Na, Ga, Learn, _Gas, LearnedNodes), 1013 NewHolders is Holders \/ LearnedNodes, 1014 paxos_message(learned(Key,Gen,Value,NewHolders), -, Learned), 1015 broadcast(Learned), 1016 update_failed(replicate, Nodes, LearnedNodes). 1017 1018replication_key(_Nodes, Key) :- 1019 ground(Key), 1020 !. 1021replication_key(Nodes, Key) :- 1022 ( Nth is 1+random(popcount(Nodes)) 1023 ; Nth = 1 1024 ), 1025 call_nth(needs_replicate(Nodes, Key), Nth), 1026 !. 1027 1028needs_replicate(Nodes, Key) :- 1029 ledger_current(Key, _Gen, _Value, Holders), 1030 Nodes /\ \Holders =\= 0, 1031 \+ paxos_admin_key(_, Key). 1032 1033 1034 /******************************* 1035 * KEY CHANGE EVENTS * 1036 *******************************/
paxos_changed(Key,Value)
notifications for Key, which
are emitted as the result of successful paxos_set/3 transactions.
When one is received for Key, then Goal is executed in a separate
thread of execution.
1054paxos_on_change(Term, Goal) :- 1055 paxos_key(Term, Key), 1056 paxos_on_change(Key, Term, Goal). 1057 1058paxos_on_change(Key, Value, Goal) :- 1059 Goal = _:Plain, 1060 must_be(callable, Plain), 1061 ( Plain == ignore 1062 -> unlisten(paxos_user, paxos_changed(Key,Value)) 1063 ; listen(paxos_user, paxos_changed(Key,Value), 1064 key_changed(Key, Value, Goal)), 1065 paxos_initialize 1066 ). 1067 1068key_changed(_Key, _Value, Goal) :- 1069 E = error(_,_), 1070 catch(thread_create(Goal, _, [detached(true)]), 1071 E, key_error(E)). 1072 1073key_error(error(permission_error(create, thread, _), _)) :- 1074 !. 1075key_error(E) :- 1076 print_message(error, E). 1077 1078 1079 /******************************* 1080 * HOOKS * 1081 *******************************/
1099paxos_message(Paxos:From, TMO, Message) :- 1100 paxos_message_raw(paxos(Paxos):From, TMO, Message). 1101paxos_message(Paxos, TMO, Message) :- 1102 paxos_message_raw(paxos(Paxos), TMO, Message). 1103 1104paxos_message_raw(Message, TMO, WireMessage) :- 1105 paxos_message_hook(Message, TMO, WireMessage), 1106 !. 1107paxos_message_raw(Message, TMO, WireMessage) :- 1108 throw(error(mode_error(det, fail, 1109 paxos:paxos_message_hook(Message, TMO, WireMessage)), _)). 1110 1111 1112 /******************************* 1113 * STORAGE * 1114 *******************************/
1136:- dynamic 1137 paxons_ledger/4. % Key, Gen, Value, Holders
1143ledger_current(Key, Gen, Value, Holders) :- 1144 paxos_ledger_hook(current, Key, Gen, Value, Holders). 1145ledger_current(Key, Gen, Value, Holders) :- 1146 paxons_ledger(Key, Gen, Value, Holders), 1147 valid(Holders).
1156ledger(Key, Gen, Value) :- 1157 paxos_ledger_hook(get, Key, Gen, Value0, Holders), 1158 !, 1159 valid(Holders), 1160 Value = Value0. 1161ledger(Key, Gen, Value) :- 1162 paxons_ledger(Key, Gen, Value0, Holders), 1163 valid(Holders), 1164 !, 1165 Value = Value0.
1172ledger_create(Key, Gen, Value) :- 1173 paxos_ledger_hook(create, Key, Gen, Value, -), 1174 !. 1175ledger_create(Key, Gen, Value) :- 1176 get_time(Now), 1177 asserta(paxons_ledger(Key, Gen, Value, created(Now))).
1184ledger_update(Key, Gen, Value) :- 1185 paxos_ledger_hook(accept, Key, Gen, Value, -), 1186 !. 1187ledger_update(Key, Gen, Value) :- 1188 paxons_ledger(Key, Gen0, _Value, _Holders), 1189 !, 1190 Gen > Gen0, 1191 get_time(Now), 1192 asserta(paxons_ledger(Key, Gen, Value, accepted(Now))), 1193 ( Gen0 == 0 1194 -> retractall(paxons_ledger(Key, Gen0, _, _)) 1195 ; true 1196 ).
1202ledger_update_holders(Key, Gen, Holders) :- 1203 paxos_ledger_hook(set, Key, Gen, _, Holders), 1204 !. 1205ledger_update_holders(Key, Gen, Holders) :- 1206 clause(paxons_ledger(Key, Gen, Value, Holders0), true, Ref), 1207 !, 1208 ( Holders0 == Holders 1209 -> true 1210 ; asserta(paxons_ledger(Key, Gen, Value, Holders)), 1211 erase(Ref) 1212 ), 1213 clean_key(Holders0, Key, Gen). 1214 1215clean_key(Holders, _Key, _Gen) :- 1216 valid(Holders), 1217 !. 1218clean_key(_, Key, Gen) :- 1219 ( clause(paxons_ledger(Key, Gen0, _Value, _Holders0), true, Ref), 1220 Gen0 < Gen, 1221 erase(Ref), 1222 fail 1223 ; true 1224 ).
1231ledger_learn(Key,Gen,Value) :- 1232 paxos_ledger_hook(learn, Key, Gen, Value, -), 1233 !. 1234ledger_learn(Key,Gen,Value) :- 1235 paxons_ledger(Key, Gen0, Value0, _Holders), 1236 !, 1237 ( Gen == Gen0, 1238 Value == Value0 1239 -> true 1240 ; Gen > Gen0 1241 -> get_time(Now), 1242 asserta(paxons_ledger(Key, Gen, Value, learned(Now))) 1243 ). 1244ledger_learn(Key,Gen,Value) :- 1245 get_time(Now), 1246 asserta(paxons_ledger(Key, Gen, Value, learned(Now))).
1253ledger_forget(Nodes) :- 1254 catch(thread_create(ledger_forget_threaded(Nodes), _, 1255 [ detached(true) 1256 ]), 1257 error(permission_error(create, thread, _), _), 1258 true). 1259 1260ledger_forget_threaded(Nodes) :- 1261 debug(paxos(node), 'Forgetting 0x~16r', [Nodes]), 1262 forall(ledger_current(Key, Gen, _Value, Holders), 1263 ledger_forget(Nodes, Key, Gen, Holders)), 1264 debug(paxos(node), 'Forgotten 0x~16r', [Nodes]). 1265 1266ledger_forget(Nodes, Key, Gen, Holders) :- 1267 NewHolders is Holders /\ \Nodes, 1268 ( NewHolders \== Holders, 1269 ledger_update_holders(Key, Gen, NewHolders) 1270 -> true 1271 ; true 1272 ). 1273 1274valid(Holders) :- 1275 integer(Holders). 1276 1277 1278 /******************************* 1279 * UTIL * 1280 *******************************/
1288c_element([New | More], _Old, New) :- 1289 forall(member(N, More), N == New), 1290 !. 1291c_element(_List, Old, Old).
1298arg_union(Arg, NodeStatusList, Set) :- 1299 maplist(arg(Arg), NodeStatusList, Sets), 1300 list_union(Sets, Set). 1301 1302list_union(Sets, Set) :- 1303 list_union(Sets, 0, Set). 1304 1305list_union([], Set, Set). 1306list_union([H|T], Set0, Set) :- 1307 Set1 is Set0 \/ H, 1308 list_union(T, Set1, Set)
A Replicated Data Store
This module provides a replicated data store that is coordinated using a variation on Lamport's Paxos concensus protocol. The original method is described in his paper entitled, "The Part-time Parliament", which was published in 1998. The algorithm is tolerant of non-Byzantine failure. That is late or lost delivery or reply, but not senseless delivery or reply. The present algorithm takes advantage of the convenience offered by multicast to the quorum's membership, who can remain anonymous and who can come and go as they please without effecting Liveness or Safety properties.
Paxos' quorum is a set of one or more attentive members, whose processes respond to queries within some known time limit (< 20ms), which includes roundtrip delivery delay. This property is easy to satisfy given that every coordinator is necessarily a member of the quorum as well, and a quorum of one is permitted. An inattentive member (e.g. one whose actions are late or lost) is deemed to be "not-present" for the purposes of the present transaction and consistency cannot be assured for that member. As long as there is at least one attentive member of the quorum, then persistence of the database is assured.
Each member maintains a ledger of terms along with information about when they were originally recorded. The member's ledger is deterministic. That is to say that there can only be one entry per functor/arity combination. No member will accept a new term proposal that has a line number that is equal-to or lower-than the one that is already recorded in the ledger.
Paxos is a three-phase protocol:
1: A coordinator first prepares the quorum for a new proposal by broadcasting a proposed term. The quorum responds by returning the last known line number for that functor/arity combination that is recorded in their respective ledgers.
2: The coordinator selects the highest line number it receives, increments it by one, and then asks the quorum to finally accept the new term with the new line number. The quorum checks their respective ledgers once again and if there is still no other ledger entry for that functor/arity combination that is equal-to or higher than the specified line, then each member records the term in the ledger at the specified line. The member indicates consent by returning the specified line number back to the coordinator. If consent is withheld by a member, then the member returns a
nack
instead. The coordinator requires unanimous consent. If it isn't achieved then the proposal fails and the coordinator must start over from the beginning.3: Finally, the coordinator concludes the successful negotiation by broadcasting the agreement to the quorum in the form of a
paxos_changed(Key,Value)
event. This is the only event that should be of interest to user programs.For practical reasons, we rely on the partially synchronous behavior (e.g. limited upper time bound for replies) of broadcast_request/1 over TIPC to ensure Progress. Perhaps more importantly, we rely on the fact that the TIPC broadcast listener state machine guarantees the atomicity of broadcast_request/1 at the process level, thus obviating the need for external mutual exclusion mechanisms.
Note that this algorithm does not guarantee the rightness of the value proposed. It only guarantees that if successful, the value proposed is identical for all attentive members of the quorum.
tipc_broadcast.pl
,udp_broadcast.pl
*/