Skip to content

Commit 66affbd

Browse files
committed
Add support for encoded local pids in external terms
Also add partial support for external pids. Signed-off-by: Paul Guyot <pguyot@kallisys.net>
1 parent 89c334c commit 66affbd

File tree

10 files changed

+499
-57
lines changed

10 files changed

+499
-57
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
99
### Added
1010
- Added a limited implementation of the OTP `ets` interface
1111
- Added `code:all_loaded/0` and `code:all_available/0`
12+
- Added support for external pids and encoded pids in external terms
1213

1314
## [0.6.6] - Unreleased
1415

src/libAtomVM/ets_hashtable.c

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -285,7 +285,7 @@ static uint32_t hash_term_incr(term t, int32_t h, GlobalContext *global)
285285
return hash_integer(t, h, global);
286286
} else if (term_is_float(t)) {
287287
return hash_float(t, h, global);
288-
} else if (term_is_pid(t)) {
288+
} else if (term_is_local_pid(t)) {
289289
return hash_pid(t, h, global);
290290
} else if (term_is_reference(t)) {
291291
return hash_reference(t, h, global);

src/libAtomVM/externalterm.c

Lines changed: 84 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,13 +29,17 @@
2929
#include <stdlib.h>
3030

3131
#include "bitstring.h"
32+
#include "defaultatoms.h"
33+
#include "term.h"
3234
#include "unicode.h"
3335
#include "utils.h"
3436

3537
#define NEW_FLOAT_EXT 70
38+
#define NEW_PID_EXT 88
3639
#define SMALL_INTEGER_EXT 97
3740
#define INTEGER_EXT 98
3841
#define ATOM_EXT 100
42+
#define PID_EXT 103
3943
#define SMALL_TUPLE_EXT 104
4044
#define LARGE_TUPLE_EXT 105
4145
#define NIL_EXT 106
@@ -390,6 +394,33 @@ static int serialize_term(uint8_t *buf, term t, GlobalContext *glb)
390394
k += serialize_term(IS_NULL_PTR(buf) ? NULL : buf + k, mfa, glb);
391395
}
392396
return k;
397+
} else if (term_is_local_pid(t)) {
398+
if (!IS_NULL_PTR(buf)) {
399+
buf[0] = NEW_PID_EXT;
400+
}
401+
size_t k = 1;
402+
term node_name = glb->node_name;
403+
uint32_t creation = node_name == NONODE_AT_NOHOST_ATOM ? 0 : glb->creation;
404+
k += serialize_term(IS_NULL_PTR(buf) ? NULL : buf + k, node_name, glb);
405+
if (!IS_NULL_PTR(buf)) {
406+
WRITE_32_UNALIGNED(buf + k, term_to_local_process_id(t));
407+
WRITE_32_UNALIGNED(buf + k + 4, 0); // serial
408+
WRITE_32_UNALIGNED(buf + k + 8, creation);
409+
}
410+
return k + 12;
411+
} else if (term_is_external_pid(t)) {
412+
if (!IS_NULL_PTR(buf)) {
413+
buf[0] = NEW_PID_EXT;
414+
}
415+
size_t k = 1;
416+
term node = term_to_external_node(t);
417+
k += serialize_term(IS_NULL_PTR(buf) ? NULL : buf + k, node, glb);
418+
if (!IS_NULL_PTR(buf)) {
419+
WRITE_32_UNALIGNED(buf + k, term_to_external_pid_process_id(t));
420+
WRITE_32_UNALIGNED(buf + k + 4, term_to_external_pid_serial(t));
421+
WRITE_32_UNALIGNED(buf + k + 8, term_to_external_node_creation(t));
422+
}
423+
return k + 12;
393424
} else {
394425
fprintf(stderr, "Unknown external term type: %" TERM_U_FMT "\n", t);
395426
AVM_ABORT();
@@ -659,6 +690,32 @@ static term parse_external_terms(const uint8_t *external_term_buf, size_t *eterm
659690
return term_from_atom_index(global_atom_id);
660691
}
661692

693+
case NEW_PID_EXT: {
694+
size_t node_size;
695+
term node = parse_external_terms(external_term_buf + 1, &node_size, copy, heap, glb);
696+
if (UNLIKELY(!term_is_atom(node))) {
697+
return term_invalid_term();
698+
}
699+
uint32_t number = READ_32_UNALIGNED(external_term_buf + node_size + 1);
700+
uint32_t serial = READ_32_UNALIGNED(external_term_buf + node_size + 5);
701+
uint32_t creation = READ_32_UNALIGNED(external_term_buf + node_size + 9);
702+
*eterm_size = node_size + 13;
703+
if (node != NONODE_AT_NOHOST_ATOM) {
704+
term this_node = glb->node_name;
705+
uint32_t this_creation = this_node == NONODE_AT_NOHOST_ATOM ? 0 : glb->creation;
706+
if (node == this_node && creation == this_creation) {
707+
return term_from_local_process_id(number);
708+
} else {
709+
return term_from_external_process_id(node, number, serial, creation, heap);
710+
}
711+
} else {
712+
if (UNLIKELY(serial != 0 || creation != 0)) {
713+
return term_invalid_term();
714+
}
715+
return term_from_local_process_id(number);
716+
}
717+
}
718+
662719
default:
663720
return term_invalid_term();
664721
}
@@ -948,6 +1005,33 @@ static int calculate_heap_usage(const uint8_t *external_term_buf, size_t remaini
9481005
return 0;
9491006
}
9501007

1008+
case NEW_PID_EXT: {
1009+
if (UNLIKELY(remaining < 1)) {
1010+
return INVALID_TERM_SIZE;
1011+
}
1012+
remaining -= 1;
1013+
int buf_pos = 1;
1014+
size_t heap_size = EXTERNAL_PID_SIZE;
1015+
size_t node_size = 0;
1016+
int u = calculate_heap_usage(external_term_buf + buf_pos, remaining, &node_size, copy);
1017+
if (UNLIKELY(u == INVALID_TERM_SIZE)) {
1018+
return INVALID_TERM_SIZE;
1019+
}
1020+
if (external_term_buf[1] == SMALL_ATOM_UTF8_EXT) {
1021+
// Check if it's non-distributed node, in which case it's always a local pid
1022+
if (external_term_buf[2] == strlen("nonode@nohost") && memcmp(external_term_buf + 3, "nonode@nohost", strlen("nonode@nohost")) == 0) {
1023+
heap_size = 0;
1024+
}
1025+
// If this is our node, but we're distributed, we'll allocate more memory and may not use it.
1026+
// This way we're sure to not go out of bounds if distribution changes between now and when we deserialize
1027+
} else if (UNLIKELY(external_term_buf[1] != ATOM_EXT)) {
1028+
return INVALID_TERM_SIZE;
1029+
}
1030+
buf_pos += node_size;
1031+
*eterm_size = buf_pos + 12;
1032+
return heap_size + u;
1033+
}
1034+
9511035
default:
9521036
return INVALID_TERM_SIZE;
9531037
}

src/libAtomVM/memory.c

Lines changed: 15 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -476,7 +476,11 @@ unsigned long memory_estimate_usage(term t)
476476
} else if (term_is_nil(t)) {
477477
t = temp_stack_pop(&temp_stack);
478478

479-
} else if (term_is_pid(t)) {
479+
} else if (term_is_local_pid(t)) {
480+
t = temp_stack_pop(&temp_stack);
481+
482+
} else if (term_is_external_pid(t)) {
483+
acc += EXTERNAL_PID_SIZE;
480484
t = temp_stack_pop(&temp_stack);
481485

482486
} else if (term_is_nonempty_list(t)) {
@@ -587,7 +591,7 @@ static void memory_scan_and_copy(HeapFragment *old_fragment, term *mem_start, co
587591
TRACE("Found NIL (%" TERM_X_FMT ")\n", t);
588592
ptr++;
589593

590-
} else if (term_is_pid(t)) {
594+
} else if (term_is_local_pid(t)) {
591595
TRACE("Found PID (%" TERM_X_FMT ")\n", t);
592596
ptr++;
593597

@@ -620,6 +624,10 @@ static void memory_scan_and_copy(HeapFragment *old_fragment, term *mem_start, co
620624
TRACE("- Found ref.\n");
621625
break;
622626

627+
case TERM_BOXED_EXTERNAL_PID:
628+
TRACE("- Found external pid.\n");
629+
break;
630+
623631
case TERM_BOXED_FUN: {
624632
int fun_size = term_get_size_from_boxed_header(t);
625633
TRACE("- Found fun, size: %i.\n", fun_size);
@@ -740,6 +748,10 @@ static void memory_scan_and_rewrite(size_t count, term *terms, const term *old_s
740748
ptr += term_get_size_from_boxed_header(t);
741749
break;
742750

751+
case TERM_BOXED_EXTERNAL_PID:
752+
ptr += EXTERNAL_PID_SIZE - 1;
753+
break;
754+
743755
case TERM_BOXED_FUN:
744756
// Skip header and module and process next terms
745757
ptr++;
@@ -810,7 +822,7 @@ HOT_FUNC static term memory_shallow_copy_term(HeapFragment *old_fragment, term t
810822
} else if (term_is_nil(t)) {
811823
return t;
812824

813-
} else if (term_is_pid(t)) {
825+
} else if (term_is_local_pid(t)) {
814826
return t;
815827

816828
} else if (term_is_cp(t)) {

src/libAtomVM/nifs.c

Lines changed: 14 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -996,7 +996,7 @@ static term nif_erlang_register_2(Context *ctx, int argc, term argv[])
996996
term reg_name_term = argv[0];
997997
VALIDATE_VALUE(reg_name_term, term_is_atom);
998998
term pid_or_port_term = argv[1];
999-
VALIDATE_VALUE(pid_or_port_term, term_is_pid);
999+
VALIDATE_VALUE(pid_or_port_term, term_is_local_pid);
10001000

10011001
int atom_index = term_to_atom_index(reg_name_term);
10021002
int32_t pid = term_to_local_process_id(pid_or_port_term);
@@ -1400,7 +1400,7 @@ static term nif_erlang_send_2(Context *ctx, int argc, term argv[])
14001400
term target = argv[0];
14011401
GlobalContext *glb = ctx->global;
14021402

1403-
if (term_is_pid(target)) {
1403+
if (term_is_local_pid(target)) {
14041404
int32_t local_process_id = term_to_local_process_id(target);
14051405

14061406
globalcontext_send_message(glb, local_process_id, argv[1]);
@@ -2741,7 +2741,7 @@ static term nif_erlang_process_flag(Context *ctx, int argc, term argv[])
27412741
flag = argv[1];
27422742
value = argv[2];
27432743

2744-
VALIDATE_VALUE(pid, term_is_pid);
2744+
VALIDATE_VALUE(pid, term_is_local_pid);
27452745
int local_process_id = term_to_local_process_id(pid);
27462746
Context *target = globalcontext_get_process_lock(ctx->global, local_process_id);
27472747
if (IS_NULL_PTR(target)) {
@@ -3191,7 +3191,7 @@ static term nif_binary_split(Context *ctx, int argc, term argv[])
31913191

31923192
if (num_segments == 1) {
31933193
// not found
3194-
if (UNLIKELY(memory_ensure_free_with_roots(ctx, 2, 1, argv, MEMORY_CAN_SHRINK) != MEMORY_GC_OK)) {
3194+
if (UNLIKELY(memory_ensure_free_with_roots(ctx, LIST_SIZE(1, 0), 1, argv, MEMORY_CAN_SHRINK) != MEMORY_GC_OK)) {
31953195
RAISE_ERROR(OUT_OF_MEMORY_ATOM);
31963196
}
31973197

@@ -3443,11 +3443,11 @@ static term nif_erlang_pid_to_list(Context *ctx, int argc, term argv[])
34433443

34443444
term t = argv[0];
34453445
VALIDATE_VALUE(t, term_is_pid);
3446+
size_t max_len = term_is_external(t) ? EXTERNAL_PID_AS_CSTRING_LEN : LOCAL_PID_AS_CSTRING_LEN;
34463447

3447-
char buf[PID_AS_CSTRING_LEN];
3448-
int str_len = term_snprint(buf, PID_AS_CSTRING_LEN, t, ctx->global);
3448+
char buf[max_len];
3449+
int str_len = term_snprint(buf, max_len, t, ctx->global);
34493450
if (UNLIKELY(str_len < 0)) {
3450-
// TODO: change to internal error or something like that
34513451
RAISE_ERROR(OUT_OF_MEMORY_ATOM);
34523452
}
34533453

@@ -3559,7 +3559,7 @@ static term nif_erlang_garbage_collect(Context *ctx, int argc, term argv[])
35593559
} else {
35603560
// argc == 1
35613561
term t = argv[0];
3562-
VALIDATE_VALUE(t, term_is_pid);
3562+
VALIDATE_VALUE(t, term_is_local_pid);
35633563

35643564
int local_id = term_to_local_process_id(t);
35653565
Context *target = globalcontext_get_process_lock(ctx->global, local_id);
@@ -3602,7 +3602,7 @@ static term nif_erlang_exit(Context *ctx, int argc, term argv[])
36023602
RAISE(LOWERCASE_EXIT_ATOM, reason);
36033603
} else {
36043604
term target_process = argv[0];
3605-
VALIDATE_VALUE(target_process, term_is_pid);
3605+
VALIDATE_VALUE(target_process, term_is_local_pid);
36063606
term reason = argv[1];
36073607
GlobalContext *glb = ctx->global;
36083608
Context *target = globalcontext_get_process_lock(glb, term_to_local_process_id(target_process));
@@ -3715,7 +3715,7 @@ static term nif_erlang_monitor(Context *ctx, int argc, term argv[])
37153715
RAISE_ERROR(BADARG_ATOM);
37163716
}
37173717

3718-
VALIDATE_VALUE(target_pid, term_is_pid);
3718+
VALIDATE_VALUE(target_pid, term_is_local_pid);
37193719

37203720
int local_process_id = term_to_local_process_id(target_pid);
37213721
Context *target = globalcontext_get_process_lock(ctx->global, local_process_id);
@@ -3783,7 +3783,7 @@ static term nif_erlang_link(Context *ctx, int argc, term argv[])
37833783

37843784
term target_pid = argv[0];
37853785

3786-
VALIDATE_VALUE(target_pid, term_is_pid);
3786+
VALIDATE_VALUE(target_pid, term_is_local_pid);
37873787

37883788
int local_process_id = term_to_local_process_id(target_pid);
37893789
Context *target = globalcontext_get_process_lock(ctx->global, local_process_id);
@@ -3814,7 +3814,7 @@ static term nif_erlang_unlink(Context *ctx, int argc, term argv[])
38143814

38153815
term target_pid = argv[0];
38163816

3817-
VALIDATE_VALUE(target_pid, term_is_pid);
3817+
VALIDATE_VALUE(target_pid, term_is_local_pid);
38183818

38193819
int local_process_id = term_to_local_process_id(target_pid);
38203820
Context *target = globalcontext_get_process_lock(ctx->global, local_process_id);
@@ -3845,8 +3845,8 @@ static term nif_erlang_group_leader(Context *ctx, int argc, term argv[])
38453845
} else {
38463846
term leader = argv[0];
38473847
term pid = argv[1];
3848-
VALIDATE_VALUE(pid, term_is_pid);
3849-
VALIDATE_VALUE(leader, term_is_pid);
3848+
VALIDATE_VALUE(pid, term_is_local_pid);
3849+
VALIDATE_VALUE(leader, term_is_local_pid);
38503850

38513851
int local_process_id = term_to_local_process_id(pid);
38523852
Context *target = globalcontext_get_process_lock(ctx->global, local_process_id);

src/libAtomVM/opcodesswitch.h

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2384,7 +2384,7 @@ HOT_FUNC int scheduler_entry_point(GlobalContext *glb)
23842384
#ifdef IMPL_EXECUTE_LOOP
23852385
term recipient_term = x_regs[0];
23862386
int local_process_id;
2387-
if (term_is_pid(recipient_term)) {
2387+
if (term_is_local_pid(recipient_term)) {
23882388
local_process_id = term_to_local_process_id(recipient_term);
23892389
} else if (term_is_atom(recipient_term)) {
23902390
local_process_id = globalcontext_get_registered_process(ctx->global, term_to_atom_index(recipient_term));
@@ -2984,7 +2984,7 @@ HOT_FUNC int scheduler_entry_point(GlobalContext *glb)
29842984
#ifdef IMPL_EXECUTE_LOOP
29852985
TRACE("is_port/2, label=%i, arg1=%lx\n", label, arg1);
29862986

2987-
if (term_is_pid(arg1)) {
2987+
if (term_is_local_pid(arg1)) {
29882988
int local_process_id = term_to_local_process_id(arg1);
29892989
Context *target = globalcontext_get_process_lock(ctx->global, local_process_id);
29902990
bool is_port_driver = false;

src/libAtomVM/posix_nifs.c

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -427,7 +427,7 @@ static term nif_atomvm_posix_write(Context *ctx, int argc, term argv[])
427427
static term nif_atomvm_posix_select(Context *ctx, term argv[], enum ErlNifSelectFlags mode)
428428
{
429429
term process_pid_term = argv[1];
430-
VALIDATE_VALUE(process_pid_term, term_is_pid);
430+
VALIDATE_VALUE(process_pid_term, term_is_local_pid);
431431
int32_t process_pid = term_to_local_process_id(process_pid_term);
432432
term select_ref_term = argv[2];
433433
if (select_ref_term != UNDEFINED_ATOM) {

src/libAtomVM/term.c

Lines changed: 43 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -189,10 +189,17 @@ int term_funprint(PrinterFun *fun, term t, const GlobalContext *global)
189189
ret += printed;
190190
return ret;
191191
}
192-
} else if (term_is_pid(t)) {
192+
} else if (term_is_local_pid(t)) {
193193
int32_t process_id = term_to_local_process_id(t);
194194
return fun->print(fun, "<0.%" PRIu32 ".0>", process_id);
195195

196+
} else if (term_is_external_pid(t)) {
197+
uint32_t node_atom_index = term_to_atom_index(term_to_external_node(t));
198+
uint32_t number = term_to_external_pid_process_id(t);
199+
uint32_t serial = term_to_external_pid_serial(t);
200+
// creation is not printed
201+
return fun->print(fun, "<%" PRIu32 ".%" PRIu32 ".%" PRIu32 ">", node_atom_index, number, serial);
202+
196203
} else if (term_is_function(t)) {
197204
const term *boxed_value = term_to_const_term_ptr(t);
198205

@@ -664,11 +671,45 @@ TermCompareResult term_compare(term t, term other, TermCompareOpts opts, GlobalC
664671
result = (atom_cmp_result > 0) ? TermGreaterThan : TermLessThan;
665672
break;
666673

667-
} else if (term_is_pid(t) && term_is_pid(other)) {
674+
} else if (term_is_external_pid(t) && term_is_external_pid(other)) {
675+
term node = term_to_external_node(t);
676+
term other_node = term_to_external_node(other);
677+
if (node == other_node) {
678+
uint32_t creation = term_to_external_node_creation(t);
679+
uint32_t other_creation = term_to_external_node_creation(other);
680+
if (creation == other_creation) {
681+
uint32_t serial = term_to_external_pid_serial(t);
682+
uint32_t other_serial = term_to_external_pid_serial(other);
683+
if (serial == other_serial) {
684+
uint32_t process_id = term_to_external_pid_process_id(t);
685+
uint32_t other_process_id = term_to_external_pid_process_id(other);
686+
if (process_id == other_process_id) {
687+
CMP_POP_AND_CONTINUE();
688+
} else {
689+
result = (process_id > other_process_id) ? TermGreaterThan : TermLessThan;
690+
break;
691+
}
692+
} else {
693+
result = (serial > other_serial) ? TermGreaterThan : TermLessThan;
694+
break;
695+
}
696+
} else {
697+
result = (creation > other_creation) ? TermGreaterThan : TermLessThan;
698+
break;
699+
}
700+
} else {
701+
result = (node > other_node) ? TermGreaterThan : TermLessThan;
702+
break;
703+
}
704+
} else if (term_is_local_pid(t) && term_is_local_pid(other)) {
668705
//TODO: handle ports
669706
result = (t > other) ? TermGreaterThan : TermLessThan;
670707
break;
671708

709+
} else if (term_is_pid(t) && term_is_pid(other)) {
710+
result = term_is_local_pid(other) ? TermGreaterThan : TermLessThan;
711+
break;
712+
672713
} else {
673714
result = (term_type_to_index(t) > term_type_to_index(other)) ? TermGreaterThan : TermLessThan;
674715
break;

0 commit comments

Comments
 (0)