Toggle Light / Dark / Auto color theme
Toggle table of contents sidebar
Source code for tgram_dnd.flows.message_flow
from tgram_dnd.blocks.message_block import MessageBlock
from tgram import TgBot , filters
from tgram.types import Message
from typing import List , Optional , Union
import tgram_dnd
[docs]
class MessageFlow :
'''a flow used to track Messages
Args:
blocks (Union[List[:class:`tgram_dnd.flows.MessageBlock`]], :class:`tgram_dnd.flows.MessageBlock`): The proccesing blocks
filter (`tgram.filters.Filter <https://z44d.github.io/tgram/tgram.html#tgram.filters.Filter>`_, *optional*): filter incoming messages, pass Nothing to trigger all updates
Returns:
None'''
def __init__ (
self ,
blocks : Union [ List [ MessageBlock ], MessageBlock ],
filter : Optional [ filters . Filter ] = None ,
):
self . blocks = [ blocks ] if not isinstance ( blocks , list ) else blocks
self . filter = filter or filters . all
[docs]
def inject (
self ,
app : "tgram_dnd.app.App"
):
self . bot = app . bot
self . app = app
[docs]
def load_plugin ( self ) -> None :
'''loads plugin into the bot'''
@self . bot . on_message ( self . filter )
async def handle (
bot : TgBot ,
m : Message
):
for block in self . blocks :
block . inject ( self . app )
await block . exec ( bot , m )