{"name":"replicated_counter.nu","source":"// examples/replicated_counter.nu — a PNCounter replicated across the group by\n// CRDT gossip over the overlay (TODO §7.4 Phase 6). Each node increments its\n// OWN replica slot, periodically broadcasts its encoded counter to the group\n// (transport_broadcast → relay multicast), and merges every counter it\n// receives. Because PNCounter.merge is a CRDT merge, all nodes converge to\n// the same total with no coordination.\n//\n//   ./nurl.sh examples/replicated_counter.nu <relay_host> <relay_port> <replica_id>\n//\n// Run a relay (examples/relay.nu) and several of these with distinct ids; the\n// printed value converges to the sum of everyone's increments.\n\n$ `stdlib/core/string.nu`\n$ `stdlib/core/vec.nu`\n$ `stdlib/std/time.nu`\n$ `stdlib/ext/env.nu`\n$ `stdlib/net/relay.nu`\n$ `stdlib/net/transport.nu`\n$ `stdlib/dist/crdt.nu`\n$ `stdlib/dist/replicator.nu`\n$ `stdlib/dist/identity.nu`\n\n@ pk_for i id → ( Vec u ) { : ( Vec u ) v ( vec_new [u] ) : ~ i k 0 ~ < k 32 { ( vec_push [u] v # u + id k ) = k + k 1 } ^ v }\n\n@ group_id → ( Vec u ) { : ( Vec u ) v ( vec_new [u] ) : ~ i k 0 ~ < k 32 { ( vec_push [u] v # u + 200 k ) = k + k 1 } ^ v }\n\n@ main → i {\n    : i argc ( env_args_count )\n    : String host ? > argc 1 ( env_arg 1 ) ( string_from `127.0.0.1` )\n    : i port ? > argc 2 {\n        : String ps ( env_arg 2 ) : i p ( nurl_str_to_int ( string_data ps ) ) ( string_free ps ) p\n    } 47700\n    : i rid ? > argc 3 {\n        : String rs ( env_arg 3 ) : i r ( nurl_str_to_int ( string_data rs ) ) ( string_free rs ) r\n    } 0\n\n    ?? ( relay_dial ( string_data host ) port ) {\n        T rc → {\n            : ( Vec u ) self_pk ( pk_for + 1 rid )\n            // The CRDT replica id comes from the PUBKEY, not the CLI arg: every\n            // node derives the same id for a given pubkey with no coordination,\n            // so distinct replicas never collide into one counter slot.\n            : i crep ( identity_stable_id self_pk )\n            ?? ( relay_register rc self_pk ) { T _ → {} F _ → {} }\n            ( relay_set_timeout rc 300 )\n            : *Transport tr # *Transport ( transport_open # s 0 rc 1 )\n            : ( Vec u ) g ( group_id )\n            ?? ( transport_group_join tr g ) { T _ → {} F _ → {} }\n\n            : *PNCounter ctr ( pncounter_new )\n\n            : ~ i round 0\n            ~ < round 5 {\n                ( pncounter_inc ctr crep 1 )  // count our own work (pubkey-stable slot)\n                : ( Vec u ) wire ( pncounter_encode ctr )\n                ?? ( transport_broadcast tr g wire ) { T _ → {} F _ → {} }\n                ( vec_free [u] wire )\n                // drain inbound counters and merge them\n                : ~ b more T\n                ~ more {\n                    ?? ( transport_recv tr 100 ) {\n                        T m → { ( pncounter_merge_bytes ctr . m payload ) ( transport_msg_free m ) }\n                        F → { = more F }\n                    }\n                }\n                ( sleep_ms 200 )\n                = round + round 1\n            }\n\n            ( nurl_print `replica ` ) ( nurl_print_int rid )\n            ( nurl_print ` converged value = ` ) ( nurl_print_int ( pncounter_value ctr ) ) ( nurl_print `\\n` )\n\n            ( pncounter_free ctr )\n            ( vec_free [u] self_pk ) ( vec_free [u] g )\n            ( transport_free tr ) ( relay_close rc )\n        }\n        F e → ( nurl_print `could not reach relay\\n` )\n    }\n    ( string_free host )\n    ^ 0\n}\n","bytes":3461}