1
+ import os
2
+ import asyncio
3
+ from metaapi_cloud_sdk import MetaApi
4
+ from metaapi_cloud_sdk import CopyFactory
5
+ from metaapi_cloud_sdk .metaapi .models import format_date
6
+ from datetime import datetime
7
+ import json
8
+ import httpx
9
+
10
+ # your MetaApi API token
11
+ token = os .getenv ('TOKEN' ) or '<put in your token here>'
12
+
13
+ # your provider MetaApi account id
14
+ # provider account must have PROVIDER value in copyFactoryRoles
15
+ provider_account_id = os .getenv ('PROVIDER_ACCOUNT_ID' ) or '<put in your provider account id here>'
16
+
17
+ copyfactory = CopyFactory (token )
18
+
19
+
20
+ async def webhooks_example ():
21
+ api = MetaApi (token )
22
+ try :
23
+ configuration_api = copyfactory .configuration_api
24
+ strategies = await configuration_api .get_strategies_with_infinite_scroll_pagination ()
25
+ strategy = next ((s for s in strategies if s ['accountId' ] == provider_account_id ), None )
26
+ strategy_id = None
27
+ if strategy :
28
+ strategy_id = strategy ['_id' ]
29
+ else :
30
+ strategy_id = await configuration_api .generate_strategy_id ()
31
+ strategy_id = strategy_id ['id' ]
32
+
33
+ print (f'Creating a strategy { strategy_id } if it does not exist' )
34
+ await configuration_api .update_strategy (strategy_id , {
35
+ 'name' : 'Test strategy' ,
36
+ 'description' : 'Some useful description about your strategy' ,
37
+ 'accountId' : provider_account_id
38
+ })
39
+
40
+ print ('Creating a webhook' )
41
+ webhook = await configuration_api .create_webhook (strategy_id , {
42
+ 'symbolMapping' : [{'from' : 'EURUSD.m' , 'to' : 'EURUSD' }], 'magic' : 100
43
+ })
44
+ print ('Created webhook' , webhook )
45
+
46
+ print ('Updating webhook' )
47
+ await configuration_api .update_webhook (strategy_id , webhook ['id' ], {
48
+ 'symbolMapping' : [
49
+ {'from' : 'EURUSD.m' , 'to' : 'EURUSD' },
50
+ {'from' : 'BTCUSD.m' , 'to' : 'BTCUSD' }
51
+ ],
52
+ 'magic' : 100
53
+ })
54
+
55
+ print ('Retrieving webhooks with infinite scroll pagination' )
56
+ webhooks1 = await configuration_api .get_webhooks_with_infinite_scroll_pagination (strategy_id )
57
+ print ('Retrieved webhooks' , webhooks1 )
58
+
59
+ print ('Retrieving webhooks with classic pagination' )
60
+ webhooks2 = await configuration_api .get_webhooks_with_classic_scroll_pagination (strategy_id )
61
+ print ('Retrieved webhooks' , webhooks2 )
62
+
63
+ print ('Sending a trading signal to the webhook. Curl command:' )
64
+ payload = {
65
+ 'symbol' : 'EURUSD' ,
66
+ 'type' : 'POSITION_TYPE_BUY' ,
67
+ 'time' : format_date (datetime .now ()),
68
+ 'volume' : 0.1
69
+ }
70
+
71
+ print (f"curl -X POST --header 'Content-Type: application/json' --header 'Accept: application/json' -d " +
72
+ f"'{ json .dumps (payload )} ' '{ webhook ['url' ]} '" )
73
+ async with httpx .AsyncClient () as client :
74
+ response = (await client .post (webhook ['url' ], json = payload )).json ()
75
+ print ('Sent the signal, signal ID: ' + response ['signalId' ])
76
+
77
+ print ('Deleting webhook ' + webhook ['id' ])
78
+ await configuration_api .delete_webhook (strategy_id , webhook ['id' ])
79
+ except Exception as err :
80
+ print (api .format_error (err ))
81
+
82
+ asyncio .run (webhooks_example ())
0 commit comments