-
Notifications
You must be signed in to change notification settings - Fork 17
Expand file tree
/
Copy pathGroup.py
More file actions
849 lines (731 loc) · 33.7 KB
/
Group.py
File metadata and controls
849 lines (731 loc) · 33.7 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
from ens.ens import Address, HexStr
from web3 import Web3
from story_protocol_python_sdk.abi.CoreMetadataModule.CoreMetadataModule_client import (
CoreMetadataModuleClient,
)
from story_protocol_python_sdk.abi.GroupingModule.GroupingModule_client import (
GroupingModuleClient,
)
from story_protocol_python_sdk.abi.GroupingWorkflows.GroupingWorkflows_client import (
GroupingWorkflowsClient,
)
from story_protocol_python_sdk.abi.IPAccountImpl.IPAccountImpl_client import (
IPAccountImplClient,
)
from story_protocol_python_sdk.abi.IPAssetRegistry.IPAssetRegistry_client import (
IPAssetRegistryClient,
)
from story_protocol_python_sdk.abi.LicenseRegistry.LicenseRegistry_client import (
LicenseRegistryClient,
)
from story_protocol_python_sdk.abi.LicensingModule.LicensingModule_client import (
LicensingModuleClient,
)
from story_protocol_python_sdk.abi.ModuleRegistry.ModuleRegistry_client import (
ModuleRegistryClient,
)
from story_protocol_python_sdk.abi.PILicenseTemplate.PILicenseTemplate_client import (
PILicenseTemplateClient,
)
from story_protocol_python_sdk.types.common import RevShareType
from story_protocol_python_sdk.types.resource.Group import (
ClaimReward,
ClaimRewardsResponse,
CollectRoyaltiesResponse,
)
from story_protocol_python_sdk.utils.constants import ZERO_ADDRESS, ZERO_HASH
from story_protocol_python_sdk.utils.licensing_config_data import LicensingConfigData
from story_protocol_python_sdk.utils.sign import Sign
from story_protocol_python_sdk.utils.transaction_utils import build_and_send_transaction
from story_protocol_python_sdk.utils.validation import get_revenue_share
class Group:
"""
A class to manage groups on Story Protocol.
:param web3 Web3: An instance of Web3.
:param account: The account to use for transactions.
:param chain_id int: The ID of the blockchain network.
"""
def __init__(self, web3: Web3, account, chain_id: int):
self.web3 = web3
self.account = account
self.chain_id = chain_id
self.grouping_module_client = GroupingModuleClient(web3)
self.grouping_workflows_client = GroupingWorkflowsClient(web3)
self.ip_asset_registry_client = IPAssetRegistryClient(web3)
self.core_metadata_module_client = CoreMetadataModuleClient(web3)
self.licensing_module_client = LicensingModuleClient(web3)
self.license_registry_client = LicenseRegistryClient(web3)
self.pi_license_template_client = PILicenseTemplateClient(web3)
self.module_registry_client = ModuleRegistryClient(web3)
self.sign_util = Sign(web3, self.chain_id, self.account)
def register_group(self, group_pool: str, tx_options: dict | None = None) -> dict:
"""
Registers a Group IPA.
:param group_pool str: The address of the group pool.
:param tx_options dict: [Optional] The transaction options.
:return dict: A dictionary with the transaction hash and group ID.
"""
try:
if not self.web3.is_address(group_pool):
raise ValueError(f'Address "{group_pool}" is invalid.')
response = build_and_send_transaction(
self.web3,
self.account,
self.grouping_module_client.build_registerGroup_transaction,
group_pool,
tx_options=tx_options,
)
group_id = self._parse_tx_ip_group_registered_event(response["tx_receipt"])
return {"tx_hash": response["tx_hash"], "group_id": group_id}
except Exception as e:
raise ValueError(f"Failed to register group: {str(e)}")
def register_group_and_attach_license(
self, group_pool: str, license_data: dict, tx_options: dict | None = None
) -> dict:
"""
Register a group IP with a group reward pool and attach license terms to the group IP.
:param group_pool str: The address of the group pool.
:param license_data dict: License data object with terms and config.
:param tx_options dict: [Optional] The transaction options.
:return dict: A dictionary with the transaction hash and group ID.
"""
try:
if not self.web3.is_address(group_pool):
raise ValueError(f'Group pool address "{group_pool}" is invalid.')
# Process license data
license_data_processed = self._get_license_data([license_data])[0]
response = build_and_send_transaction(
self.web3,
self.account,
self.grouping_workflows_client.build_registerGroupAndAttachLicense_transaction,
group_pool,
license_data_processed,
tx_options=tx_options,
)
group_id = self._parse_tx_ip_group_registered_event(response["tx_receipt"])
return {"tx_hash": response["tx_hash"], "group_id": group_id}
except Exception as e:
raise ValueError(f"Failed to register group and attach license: {str(e)}")
def mint_and_register_ip_and_attach_license_and_add_to_group(
self,
group_id: str,
spg_nft_contract: str,
license_data: list,
max_allowed_reward_share: int,
ip_metadata: dict | None = None,
recipient: str | None = None,
allow_duplicates: bool = True,
deadline: int | None = None,
tx_options: dict | None = None,
) -> dict:
"""
Mint an NFT from a SPGNFT collection, register it with metadata as an IP,
attach license terms to the registered IP, and add it to a group IP.
:param group_id str: The ID of the group to add the IP to.
:param spg_nft_contract str: The address of the SPG NFT contract.
:param license_data list: List of license data objects with terms and config.
:param max_allowed_reward_share int: Maximum allowed reward share percentage. Must be between 0 and 100 (where 100% represents 100,000,000).
:param ip_metadata dict: [Optional] The metadata for the IP.
:param recipient str: [Optional] The recipient of the NFT (defaults to caller).
:param allow_duplicates bool: [Optional] Whether to allow duplicate IPs.
:param deadline int: [Optional] The deadline for the signature in seconds. (default: 1000 seconds)
:param tx_options dict: [Optional] The transaction options.
:return dict: A dictionary with the transaction hash, IP ID, and token ID.
"""
try:
if not self.web3.is_address(group_id):
raise ValueError(f'Group ID "{group_id}" is invalid.')
if not self.web3.is_address(spg_nft_contract):
raise ValueError(
f'SPG NFT contract address "{spg_nft_contract}" is invalid.'
)
is_registered = self.ip_asset_registry_client.isRegistered(group_id)
if not is_registered:
raise ValueError(f"Group IP {group_id} is not registered.")
# Get IP account state
ip_account = IPAccountImplClient(self.web3, contract_address=group_id)
state = ip_account.state()
# Calculate deadline
calculated_deadline = self.sign_util.get_deadline(deadline=deadline)
# Get permission signature for adding to group
sig_add_to_group = self.sign_util.get_permission_signature(
ip_id=group_id,
deadline=calculated_deadline,
state=state,
permissions=[
{
"ipId": group_id,
"signer": self.grouping_workflows_client.contract.address,
"to": self.grouping_module_client.contract.address,
"permission": 1, # ALLOW
"func": "addIp(address,address[],uint256)",
}
],
)
licenses_data = self._get_license_data(license_data)
metadata = self._get_ip_metadata(ip_metadata)
max_allowed_reward_share = get_revenue_share(
max_allowed_reward_share, type=RevShareType.MAX_ALLOWED_REWARD_SHARE
)
# Set recipient to caller if not provided
if not recipient:
recipient = self.account.address
response = build_and_send_transaction(
self.web3,
self.account,
self.grouping_workflows_client.build_mintAndRegisterIpAndAttachLicenseAndAddToGroup_transaction,
spg_nft_contract,
group_id,
recipient,
max_allowed_reward_share,
licenses_data,
metadata,
{
"signer": self.account.address,
"deadline": calculated_deadline,
"signature": self.web3.to_bytes(
hexstr=sig_add_to_group["signature"]
),
},
allow_duplicates,
tx_options=tx_options,
)
# Parse events to get IP ID and token ID
registration_data = self._parse_tx_ip_registered_event(
response["tx_receipt"]
)
return {
"tx_hash": response["tx_hash"],
"ip_id": registration_data["ip_id"],
"token_id": registration_data["token_id"],
}
except Exception as e:
raise ValueError(
f"Failed to mint and register IP and attach license and add to group: {str(e)}"
)
def register_ip_and_attach_license_and_add_to_group(
self,
group_id: str,
nft_contract: str,
token_id: int,
license_data: list,
max_allowed_reward_share: int,
ip_metadata: dict | None = None,
deadline: int | None = None,
tx_options: dict | None = None,
) -> dict:
"""
Register an NFT as IP with metadata, attach license terms to the registered IP,
and add it to a group IP.
:param group_id str: The ID of the group to add the IP to.
:param nft_contract str: The address of the NFT contract.
:param token_id int: The token ID of the NFT.
:param license_data list: List of license data objects with terms and config.
:param max_allowed_reward_share int: Maximum allowed reward share percentage. Must be between 0 and 100 (where 100% represents 100,000,000).
:param ip_metadata dict: [Optional] The metadata for the IP.
:param deadline int: [Optional] The deadline for the signature in seconds. (default: 1000 seconds)
:param tx_options dict: [Optional] The transaction options.
:return dict: A dictionary with the transaction hash, IP ID, and token ID.
"""
try:
if not self.web3.is_address(group_id):
raise ValueError(f'Group ID "{group_id}" is invalid.')
if not self.web3.is_address(nft_contract):
raise ValueError(f'NFT contract address "{nft_contract}" is invalid.')
# Check if group is registered
is_registered = self.ip_asset_registry_client.isRegistered(group_id)
if not is_registered:
raise ValueError(f"Group IP {group_id} is not registered.")
# Get IP ID for the NFT
ip_id = self.ip_asset_registry_client.ipId(
self.chain_id, nft_contract, token_id
)
# Get IP account state
ip_account = IPAccountImplClient(self.web3, group_id)
state = ip_account.state()
# Calculate deadline
calculated_deadline = self.sign_util.get_deadline(deadline=deadline)
# Get permission signature for adding to group
sig_add_to_group = self.sign_util.get_permission_signature(
ip_id=group_id,
deadline=calculated_deadline,
state=state,
permissions=[
{
"ipId": group_id,
"signer": self.grouping_workflows_client.contract.address,
"to": self.grouping_module_client.contract.address,
"permission": 1, # ALLOW
"func": "addIp(address,address[],uint256)",
}
],
)
# Get permission signature for metadata and license
sig_metadata_and_attach = self.sign_util.get_permission_signature(
ip_id=ip_id,
deadline=calculated_deadline,
state=self.web3.to_bytes(hexstr=HexStr(ZERO_HASH)),
permissions=[
{
"ipId": ip_id,
"signer": self.grouping_workflows_client.contract.address,
"to": self.core_metadata_module_client.contract.address,
"permission": 1, # ALLOW
"func": "setAll(address,string,bytes32,bytes32)",
},
{
"ipId": ip_id,
"signer": self.grouping_workflows_client.contract.address,
"to": self.licensing_module_client.contract.address,
"permission": 1, # ALLOW
"func": "attachLicenseTerms(address,address,uint256)",
},
{
"ipId": ip_id,
"signer": self.grouping_workflows_client.contract.address,
"to": self.licensing_module_client.contract.address,
"permission": 1, # ALLOW
"func": "setLicensingConfig(address,address,uint256,(bool,uint256,address,bytes,uint32,bool,uint32,address))",
},
],
)
# Process license data
licenses_data = self._get_license_data(license_data)
# Process IP metadata
metadata = self._get_ip_metadata(ip_metadata)
response = build_and_send_transaction(
self.web3,
self.account,
self.grouping_workflows_client.build_registerIpAndAttachLicenseAndAddToGroup_transaction,
nft_contract,
token_id,
group_id,
get_revenue_share(
max_allowed_reward_share, type=RevShareType.MAX_ALLOWED_REWARD_SHARE
),
licenses_data,
metadata,
{
"signer": self.account.address,
"deadline": calculated_deadline,
"signature": self.web3.to_bytes(
hexstr=sig_metadata_and_attach["signature"]
),
},
{
"signer": self.account.address,
"deadline": calculated_deadline,
"signature": self.web3.to_bytes(
hexstr=sig_add_to_group["signature"]
),
},
tx_options=tx_options,
)
# Parse events to get IP ID and token ID
registration_data = self._parse_tx_ip_registered_event(
response["tx_receipt"]
)
return {
"tx_hash": response["tx_hash"],
"ip_id": registration_data["ip_id"],
"token_id": registration_data["token_id"],
}
except Exception as e:
raise ValueError(
f"Failed to register IP and attach license and add to group: {str(e)}"
)
def register_group_and_attach_license_and_add_ips(
self,
group_pool: str,
ip_ids: list,
license_data: dict,
max_allowed_reward_share: int,
tx_options: dict | None = None,
) -> dict:
"""
Register a group IP with a group reward pool, attach license terms to the group IP,
and add individual IPs to the group IP.
:param group_pool str: The address of the group pool.
:param ip_ids list: List of IP IDs to add to the group.
:param license_data dict: License data object with terms and config.
:param max_allowed_reward_share int: Maximum allowed reward share percentage. Must be between 0 and 100 (where 100% represents 100,000,000).
:param tx_options dict: [Optional] The transaction options.
:return dict: A dictionary with the transaction hash and group ID.
"""
try:
if not self.web3.is_address(group_pool):
raise ValueError(f'Group pool address "{group_pool}" is invalid.')
# Validate IP IDs
for ip_id in ip_ids:
if not self.web3.is_address(ip_id):
raise ValueError(f'IP ID "{ip_id}" is invalid.')
is_registered = self.ip_asset_registry_client.isRegistered(ip_id)
if not is_registered:
raise ValueError(f"IP {ip_id} is not registered.")
# Process license data
license_data_processed = self._get_license_data([license_data])[0]
# Check if license terms are attached to all IPs
for ip_id in ip_ids:
is_attached = self.license_registry_client.hasIpAttachedLicenseTerms(
ip_id,
license_data_processed["licenseTemplate"],
license_data_processed["licenseTermsId"],
)
if not is_attached:
raise ValueError(
f"License terms must be attached to IP {ip_id} before adding to group."
)
response = build_and_send_transaction(
self.web3,
self.account,
self.grouping_workflows_client.build_registerGroupAndAttachLicenseAndAddIps_transaction,
group_pool,
ip_ids,
get_revenue_share(
max_allowed_reward_share, type=RevShareType.MAX_ALLOWED_REWARD_SHARE
),
license_data_processed,
tx_options=tx_options,
)
group_id = self._parse_tx_ip_group_registered_event(response["tx_receipt"])
return {"tx_hash": response["tx_hash"], "group_id": group_id}
except Exception as e:
raise ValueError(
f"Failed to register group and attach license and add IPs: {str(e)}"
)
def collect_and_distribute_group_royalties(
self,
group_ip_id: str,
currency_tokens: list,
member_ip_ids: list,
tx_options: dict | None = None,
) -> dict:
"""
Collect royalties for the entire group and distribute the rewards to each member IP's royalty vault.
:param group_ip_id str: The ID of the group IP.
:param currency_tokens list: List of currency token addresses.
:param member_ip_ids list: List of member IP IDs.
:param tx_options dict: [Optional] The transaction options.
:return dict: A dictionary with the transaction hash and collected royalties.
"""
try:
if not self.web3.is_address(group_ip_id):
raise ValueError(f'Group IP ID "{group_ip_id}" is invalid.')
if not currency_tokens:
raise ValueError("At least one currency token is required.")
if not member_ip_ids:
raise ValueError("At least one member IP ID is required.")
# Validate currency tokens
for token in currency_tokens:
if not self.web3.is_address(token):
raise ValueError(f'Currency token address "{token}" is invalid.')
if token == ZERO_ADDRESS:
raise ValueError("Currency token cannot be the zero address.")
# Validate group IP
is_group_registered = self.ip_asset_registry_client.isRegistered(
group_ip_id
)
if not is_group_registered:
raise ValueError(
f"The group IP with ID {group_ip_id} is not registered."
)
# Validate member IPs
for ip_id in member_ip_ids:
if not self.web3.is_address(ip_id):
raise ValueError(f'Member IP ID "{ip_id}" is invalid.')
is_member_registered = self.ip_asset_registry_client.isRegistered(ip_id)
if not is_member_registered:
raise ValueError(f"Member IP with ID {ip_id} is not registered.")
response = build_and_send_transaction(
self.web3,
self.account,
self.grouping_workflows_client.build_collectRoyaltiesAndClaimReward_transaction,
group_ip_id,
currency_tokens,
member_ip_ids,
tx_options=tx_options,
)
# Parse events to get collected royalties
collected_royalties = (
self._parse_tx_collected_royalties_to_group_pool_event(
response["tx_receipt"]
)
)
royalties_distributed = self._parse_tx_royalty_paid_event(
response["tx_receipt"]
)
return {
"tx_hash": response["tx_hash"],
"collected_royalties": collected_royalties,
"royalties_distributed": royalties_distributed,
}
except Exception as e:
raise ValueError(
f"Failed to collect and distribute group royalties: {str(e)}"
)
def claim_rewards(
self,
group_ip_id: Address,
currency_token: Address,
member_ip_ids: list[Address],
tx_options: dict | None = None,
) -> ClaimRewardsResponse:
"""
Claim rewards for the entire group.
:param group_ip_id Address: The ID of the group IP.
:param currency_token Address: The address of the currency (revenue) token to claim.
:param member_ip_ids list[Address]: The IDs of the member IPs to distribute the rewards to.
:param tx_options dict: [Optional] The transaction options.
:return ClaimRewardsResponse: A response object with the transaction hash and claimed rewards.
"""
try:
if not self.web3.is_address(group_ip_id):
raise ValueError(f"Invalid group IP ID: {group_ip_id}")
if not self.web3.is_address(currency_token):
raise ValueError(f"Invalid currency token: {currency_token}")
for ip_id in member_ip_ids:
if not self.web3.is_address(ip_id):
raise ValueError(f"Invalid member IP ID: {ip_id}")
claim_reward_param = {
"groupIpId": group_ip_id,
"token": currency_token,
"memberIpIds": member_ip_ids,
}
response = build_and_send_transaction(
self.web3,
self.account,
self.grouping_module_client.build_claimReward_transaction,
*claim_reward_param.values(),
tx_options=tx_options,
)
event_signature = self.web3.keccak(
text="ClaimedReward(address,address,address[],uint256[])"
).hex()
claimed_rewards = None
for log in response["tx_receipt"]["logs"]:
if log["topics"][0].hex() == event_signature:
event_result = self.grouping_module_client.contract.events.ClaimedReward.process_log(
log
)
claimed_rewards = event_result["args"]
break
if not claimed_rewards:
raise ValueError("Not found ClaimedReward event in transaction logs.")
return ClaimRewardsResponse(
tx_hash=response["tx_hash"],
claimed_rewards=ClaimReward(
ip_ids=claimed_rewards["ipId"],
amounts=claimed_rewards["amount"],
token=claimed_rewards["token"],
group_id=claimed_rewards["groupId"],
),
)
except Exception as e:
raise ValueError(f"Failed to claim rewards: {str(e)}")
def collect_royalties(
self,
group_ip_id: Address,
currency_token: Address,
tx_options: dict | None = None,
) -> CollectRoyaltiesResponse:
"""
Collects royalties into the pool, making them claimable by group member IPs.
:param group_ip_id Address: The ID of the group IP.
:param currency_token Address: The address of the currency (revenue) token to collect.
:param tx_options dict: [Optional] The transaction options.
:return CollectRoyaltiesResponse: A response object with the transaction hash and collected royalties.
"""
try:
if not self.web3.is_address(group_ip_id):
raise ValueError(f"Invalid group IP ID: {group_ip_id}")
if not self.web3.is_address(currency_token):
raise ValueError(f"Invalid currency token: {currency_token}")
response = build_and_send_transaction(
self.web3,
self.account,
self.grouping_module_client.build_collectRoyalties_transaction,
group_ip_id,
currency_token,
tx_options=tx_options,
)
event_signature = self.web3.keccak(
text="CollectedRoyaltiesToGroupPool(address,address,address,uint256)"
).hex()
collected_royalties = 0
for log in response["tx_receipt"]["logs"]:
if log["topics"][0].hex() == event_signature:
event_results = self.grouping_module_client.contract.events.CollectedRoyaltiesToGroupPool.process_log(
log
)
collected_royalties = event_results["args"]["amount"]
break
return CollectRoyaltiesResponse(
tx_hash=response["tx_hash"],
collected_royalties=collected_royalties,
)
except Exception as e:
raise ValueError(f"Failed to collect royalties: {str(e)}")
def get_claimable_reward(
self,
group_ip_id: Address,
currency_token: Address,
member_ip_ids: list[Address],
) -> list[int]:
"""
Returns the available reward for each IP in the group.
:param group_ip_id Address: The ID of the group IP.
:param currency_token Address: The address of the currency (revenue) token to check.
:param member_ip_ids list[Address]: The IDs of the member IPs to check claimable rewards for.
:return list[int]: A list of claimable reward amounts corresponding to each member IP ID.
"""
try:
if not self.web3.is_address(group_ip_id):
raise ValueError(f"Invalid group IP ID: {group_ip_id}")
if not self.web3.is_address(currency_token):
raise ValueError(f"Invalid currency token: {currency_token}")
for ip_id in member_ip_ids:
if not self.web3.is_address(ip_id):
raise ValueError(f"Invalid member IP ID: {ip_id}")
claimable_rewards = self.grouping_module_client.getClaimableReward(
groupId=group_ip_id,
token=currency_token,
ipIds=member_ip_ids,
)
return claimable_rewards
except Exception as e:
raise ValueError(f"Failed to get claimable rewards: {str(e)}")
def _get_license_data(self, license_data: list) -> list:
"""
Process license data into the format expected by the contracts.
:param license_data list: List of license data objects.
:return list: Processed license data.
"""
if not license_data:
raise ValueError("License data is required.")
result = []
for item in license_data:
# Check if license_template is provided
if "license_template" in item:
license_template = item["license_template"]
else:
license_template = self.pi_license_template_client.contract.address
if not self.web3.is_address(license_template):
raise ValueError(
f'License template address "{license_template}" is invalid.'
)
processed_item = {
"licenseTemplate": license_template,
"licenseTermsId": item["license_terms_id"],
"licensingConfig": LicensingConfigData.validate_license_config(
self.module_registry_client, item.get("licensing_config", {})
),
}
result.append(processed_item)
return result
def _get_ip_metadata(self, ip_metadata: dict | None = None) -> dict:
"""
Process IP metadata into the format expected by the contracts.
:param ip_metadata dict: [Optional] The metadata for the IP.
:return dict: Processed IP metadata.
"""
metadata = {
"ipMetadataURI": "",
"ipMetadataHash": ZERO_HASH,
"nftMetadataURI": "",
"nftMetadataHash": ZERO_HASH,
}
if ip_metadata:
metadata.update(
{
"ipMetadataURI": ip_metadata.get("ip_metadata_uri", ""),
"ipMetadataHash": ip_metadata.get("ip_metadata_hash", ZERO_HASH),
"nftMetadataURI": ip_metadata.get("nft_metadata_uri", ""),
"nftMetadataHash": ip_metadata.get("nft_metadata_hash", ZERO_HASH),
}
)
return metadata
def _parse_tx_ip_group_registered_event(self, tx_receipt: dict) -> str:
"""
Parse the IPGroupRegistered event from a transaction receipt.
:param tx_receipt dict: The transaction receipt.
:return str: The group ID.
:raises ValueError: If the event is not found in the transaction receipt.
"""
event_signature = self.web3.keccak(
text="IPGroupRegistered(address,address)"
).hex()
for log in tx_receipt["logs"]:
if log["topics"][0].hex() == event_signature:
group_id = "0x" + log["topics"][1].hex()[24:]
return self.web3.to_checksum_address(group_id)
raise ValueError("IPGroupRegistered event not found in transaction receipt")
def _parse_tx_ip_registered_event(self, tx_receipt: dict) -> dict:
"""
Parse the IPRegistered event from a transaction receipt.
:param tx_receipt dict: The transaction receipt.
:return dict: The IP ID and token ID.
:raises ValueError: If the event is not found in the transaction receipt.
"""
event_signature = self.web3.keccak(
text="IPRegistered(address,uint256,address,uint256,string,string,uint256)"
).hex()
for log in tx_receipt["logs"]:
if log["topics"][0].hex() == event_signature:
ip_id = "0x" + log["data"].hex()[24:64]
token_id = int(log["topics"][3].hex(), 16)
return {
"ip_id": self.web3.to_checksum_address(ip_id),
"token_id": token_id,
}
raise ValueError("IPRegistered event not found in transaction receipt")
def _parse_tx_collected_royalties_to_group_pool_event(
self, tx_receipt: dict
) -> list:
"""
Parse the CollectedRoyaltiesToGroupPool event from a transaction receipt.
:param tx_receipt dict: The transaction receipt.
:return list: List of collected royalties.
"""
event_signature = self.web3.keccak(
text="CollectedRoyaltiesToGroupPool(address,address,address,uint256)"
).hex()
collected_royalties = []
for log in tx_receipt["logs"]:
if log["topics"][0].hex() == event_signature:
group_id = "0x" + log["topics"][1].hex()[24:]
amount = int(log["data"][:66].hex(), 16)
token = "0x" + log["topics"][2].hex()[24:]
collected_royalties.append(
{
"group_id": self.web3.to_checksum_address(group_id),
"amount": amount,
"token": self.web3.to_checksum_address(token),
}
)
return collected_royalties
def _parse_tx_royalty_paid_event(self, tx_receipt: dict) -> list:
"""
Parse the RoyaltyPaid event from a transaction receipt.
:param tx_receipt dict: The transaction receipt.
:return list: List of royalties distributed.
"""
event_signature = self.web3.keccak(
text="RoyaltyPaid(address,address,address,address,uint256,uint256)"
).hex()
royalties_distributed = []
for log in tx_receipt["logs"]:
if log["topics"][0].hex() == event_signature:
receiver_ip_id = "0x" + log["topics"][0].hex()[24:]
data = log["data"]
amount = int(data[128:160].hex(), 16)
token = "0x" + data[108:128].hex()
amount_after_fee = int(data[160:].hex(), 16)
royalties_distributed.append(
{
"ip_id": self.web3.to_checksum_address(receiver_ip_id),
"amount": amount,
"token": self.web3.to_checksum_address(token),
"amount_after_fee": amount_after_fee,
}
)
return royalties_distributed