{"name":"distributed_map.nu","source":"// examples/distributed_map.nu — distributed map / word-count over the overlay\n// (TODO §7.5 Phase 11, the keystone in action). A coordinator shards a corpus\n// into chunks, each KEYED so dist/ring routes it to the worker that OWNS that\n// key; the owning worker runs the registered word-count handler and returns\n// the count; the coordinator awaits all and sums them. Real distributed\n// computation — no central executor, work placed by consistent hashing,\n// results flow back over net/transport and are recorded idempotently.\n//\n// Run as separate processes (a relay, two workers, one coordinator):\n//\n//   ./nurl.sh examples/relay.nu              0.0.0.0   47700\n//   ./nurl.sh examples/distributed_map.nu    worker    127.0.0.1 47700 1\n//   ./nurl.sh examples/distributed_map.nu    worker    127.0.0.1 47700 2\n//   ./nurl.sh examples/distributed_map.nu    map       127.0.0.1 47700\n//\n// The coordinator prints the aggregate word count (expected 17).\n\n$ `stdlib/core/string.nu`\n$ `stdlib/core/vec.nu`\n$ `stdlib/std/bytes.nu`\n$ `stdlib/ext/env.nu`\n$ `stdlib/net/relay.nu`\n$ `stdlib/net/transport.nu`\n$ `stdlib/dist/ring.nu`\n$ `stdlib/dist/job.nu`\n\n@ pk_for i id → ( Vec u ) { : ( Vec u ) v ( vec_new [u] ) : ~ i k 0 ~ < k 32 { ( vec_push [u] v # u + id k ) = k + k 1 } ^ v }\n\n@ key_for i id → ( Vec u ) { : ( Vec u ) v ( vec_new [u] ) ( vec_push [u] v # u & id 255 ) ( vec_push [u] v # u & >> id 8 255 ) ( vec_push [u] v # u 200 ) ( vec_push [u] v # u 7 ) ^ v }\n\n@ chunk_text i id → String {\n    ^ ? == id 0 ( string_from `the quick brown fox` )\n    ? == id 1 ( string_from `jumps over the lazy dog` )\n    ? == id 2 ( string_from `a b c d e` )\n    ( string_from `one two three` )\n}\n\n@ str_bytes String s → ( Vec u ) {\n    : ( Vec u ) v ( vec_new [u] )\n    : s cs ( string_data s ) : i n ( string_len s ) : *u sp # *u cs\n    : ~ i k 0 ~ < k n { ( vec_push [u] v # u . sp k ) = k + k 1 }\n    ^ v\n}\n\n// word-count handler: payload = text bytes → result = 4-byte BE word count\n@ wordcount_handler → ( @ ( Vec u ) ( Vec u ) ) {\n    ^ \\ ( Vec u ) p → ( Vec u ) {\n        : i n ( vec_len [u] p )\n        : ~ i words 0\n        : ~ i in_word 0\n        : ~ i k 0\n        ~ < k n {\n            : i ch ?? ( vec_get [u] p k ) { T x → # i x F → 32 }\n            ? == ch 32 { = in_word 0 } { ? == in_word 0 { = words + words 1 = in_word 1 } {} }\n            = k + k 1\n        }\n        : ( Vec u ) r ( vec_new [u] )\n        ( bytes_push_u32_be r # u32 words )\n        ^ r\n    }\n}\n\n@ make_ring → *Ring {\n    : *Ring r ( ring_new )\n    : ( Vec u ) w0 ( pk_for 1 ) : ( Vec u ) w1 ( pk_for 2 )\n    ( ring_add_member r w0 64 ) ( ring_add_member r w1 64 )\n    ( vec_free [u] w0 ) ( vec_free [u] w1 )\n    ^ r\n}\n\n@ run_worker s host i port i id → v {\n    ?? ( relay_dial host port ) {\n        T rc → {\n            : ( Vec u ) me ( pk_for id )\n            ?? ( relay_register rc me ) { T _ → {} F _ → {} }\n            ( relay_set_timeout rc 250 )\n            : *Transport tr # *Transport ( transport_open # s 0 rc 1 )\n            : *Ring ring ( make_ring )\n            : *JobNode jn ( job_node_new # s tr # s ring me id )\n            ( job_register jn 0 ( wordcount_handler ) )\n            ( nurl_print `worker ` ) ( nurl_print_int id ) ( nurl_print ` ready\\n` )\n            // blocking pump (recv honours the socket timeout outside fibers)\n            : ~ i ticks 0\n            ~ < ticks 40 { ( job_pump jn 200 ) = ticks + ticks 1 }\n            ( job_node_free jn ) ( ring_free ring ) ( transport_free tr )\n            ( vec_free [u] me ) ( relay_close rc )\n        }\n        F e → ( nurl_print `worker dial failed\\n` )\n    }\n}\n\n@ run_coordinator s host i port → v {\n    ?? ( relay_dial host port ) {\n        T rc → {\n            : ( Vec u ) me ( pk_for 99 )\n            ?? ( relay_register rc me ) { T _ → {} F _ → {} }\n            ( relay_set_timeout rc 250 )\n            : *Transport tr # *Transport ( transport_open # s 0 rc 1 )\n            : *Ring ring ( make_ring )\n            : *JobNode jn ( job_node_new # s tr # s ring me 99 )\n\n            : ( Vec i ) tids ( vec_new [i] )\n            : ~ i i 0\n            ~ < i 4 {\n                : ( Vec u ) key ( key_for i )\n                : String txt ( chunk_text i )\n                : ( Vec u ) payload ( str_bytes txt )\n                ( vec_push [i] tids ( job_submit jn 0 key payload ) )\n                ( vec_free [u] key ) ( string_free txt ) ( vec_free [u] payload )\n                = i + i 1\n            }\n\n            // collect: pump until all four results land (or give up)\n            : ~ i rounds 0\n            : ~ b done F\n            ~ & ! done < rounds 40 {\n                ( job_pump jn 200 )\n                : ~ b all T : ~ i j 0\n                ~ < j 4 { ? ! ( job_has jn ?? ( vec_get [i] tids j ) { T x → x F → 0 } ) { = all F } {} = j + j 1 }\n                = done all\n                = rounds + rounds 1\n            }\n\n            : ~ i total 0\n            : ~ i j 0\n            ~ < j 4 {\n                ?? ( job_await jn ?? ( vec_get [i] tids j ) { T x → x F → 0 } ) {\n                    T r → { = total + total ?? ( bytes_read_u32_be r 0 ) { T x → # i x F → 0 } ( vec_free [u] r ) }\n                    F → {}\n                }\n                = j + j 1\n            }\n            ( nurl_print `distributed word count total = ` ) ( nurl_print_int total ) ( nurl_print ` (expected 17)\\n` )\n\n            ( vec_free [i] tids )\n            ( job_node_free jn ) ( ring_free ring ) ( transport_free tr )\n            ( vec_free [u] me ) ( relay_close rc )\n        }\n        F e → ( nurl_print `coordinator dial failed\\n` )\n    }\n}\n\n@ main → i {\n    : i argc ( env_args_count )\n    ? < argc 3 { ( nurl_print `usage: distributed_map <worker|map> <host> <port> [id]\\n` ) ^ 1 } {}\n    : String role ( env_arg 1 )\n    : String host ( env_arg 2 )\n    : String ps ( env_arg 3 ) : i port ( nurl_str_to_int ( string_data ps ) ) ( string_free ps )\n\n    ? != 0 ( nurl_str_eq ( string_data role ) `worker` ) {\n        : i id ? > argc 4 { : String is ( env_arg 4 ) : i v ( nurl_str_to_int ( string_data is ) ) ( string_free is ) v } 1\n        ( run_worker ( string_data host ) port id )\n    } {\n        ( run_coordinator ( string_data host ) port )\n    }\n    ( string_free role ) ( string_free host )\n    ^ 0\n}\n","bytes":6314}