From ac87680632b4fb6582d1391b042eff7f0305c0a2 Mon Sep 17 00:00:00 2001 From: samelian Date: Sun, 22 May 2011 20:12:04 +0000 Subject: [kdenetwork/kopete] added cmake support git-svn-id: svn://anonsvn.kde.org/home/kde/branches/trinity/kdenetwork@1233119 283d02a7-25f6-0310-bc7c-ecb5cbfe19da --- .../libjingle/talk/third_party/ortp/rtpsession.c | 1954 ++++++++++++++++++++ 1 file changed, 1954 insertions(+) create mode 100644 kopete/protocols/jabber/jingle/libjingle/talk/third_party/ortp/rtpsession.c (limited to 'kopete/protocols/jabber/jingle/libjingle/talk/third_party/ortp/rtpsession.c') diff --git a/kopete/protocols/jabber/jingle/libjingle/talk/third_party/ortp/rtpsession.c b/kopete/protocols/jabber/jingle/libjingle/talk/third_party/ortp/rtpsession.c new file mode 100644 index 00000000..de6b2e7d --- /dev/null +++ b/kopete/protocols/jabber/jingle/libjingle/talk/third_party/ortp/rtpsession.c @@ -0,0 +1,1954 @@ +/* + The oRTP library is an RTP (Realtime Transport Protocol - rfc1889) stack. + Copyright (C) 2001 Simon MORLAT simon.morlat@linphone.org + + This library is free software; you can redistribute it and/or + modify it under the terms of the GNU Lesser General Public + License as published by the Free Software Foundation; either + version 2.1 of the License, or (at your option) any later version. + + This library is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + Lesser General Public License for more details. + + You should have received a copy of the GNU Lesser General Public + License along with this library; if not, write to the Free Software + Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA +*/ + + +#include +#include +#include "rtpmod.h" +#include "jitterctl.h" +#include "scheduler.h" +#include "port_fct.h" +#include "utils.h" + +#include +#include +#include +#include + +#ifndef _WIN32 +# include +# include +# include +# include +# ifdef INET6 +# include +# endif +#else +# include +# include "errno-win32.h" +#endif + + +#if defined(HAVE_POLL_H) +#include +#elif defined(HAVE_SYS_POLL_H) +#include +#endif +#ifdef HAVE_SYS_UIO_H +#include +#define USE_SENDMSG 1 +#endif + + + +void wait_point_init(WaitPoint *wp){ + wp->lock=g_mutex_new(); + wp->cond=g_cond_new(); + wp->time=0; + wp->wakeup=FALSE; +} +void wait_point_uninit(WaitPoint *wp){ + g_cond_free(wp->cond); + g_mutex_free(wp->lock); +} + +#define wait_point_lock(wp) g_mutex_lock((wp)->lock) +#define wait_point_unlock(wp) g_mutex_unlock((wp)->lock) + +void wait_point_wakeup_at(WaitPoint *wp, guint32 t, gboolean dosleep){ + wp->time=t; + wp->wakeup=TRUE; + if (dosleep) g_cond_wait(wp->cond,wp->lock); +} + + +gboolean wait_point_check(WaitPoint *wp, guint32 t){ + gboolean ok=FALSE; + + if (wp->wakeup){ + if (TIME_IS_NEWER_THAN(t,wp->time)){ + wp->wakeup=FALSE; + ok=TRUE; + + } + } + return ok; +} +#define wait_point_wakeup(wp) g_cond_signal((wp)->cond); + +extern void rtp_parse(RtpSession *session, mblk_t *mp, guint32 local_str_ts); + + +static guint32 guint32_random(){ + return random(); +} + +void +rtp_session_init (RtpSession * session, gint mode) +{ + memset (session, 0, sizeof (RtpSession)); + session->lock = g_mutex_new (); + session->rtp.max_rq_size = RTP_MAX_RQ_SIZE; + session->mode = mode; + if ((mode == RTP_SESSION_RECVONLY) || (mode == RTP_SESSION_SENDRECV)) + { + rtp_session_set_flag (session, RTP_SESSION_RECV_SYNC); + rtp_session_set_flag (session, RTP_SESSION_RECV_NOT_STARTED); + + } + if ((mode == RTP_SESSION_SENDONLY) || (mode == RTP_SESSION_SENDRECV)) + { + rtp_session_set_flag (session, RTP_SESSION_SEND_NOT_STARTED); + rtp_session_set_flag (session, RTP_SESSION_SEND_SYNC); + session->send_ssrc=guint32_random(); + /* set default source description */ + rtp_session_set_source_description(session,"unknown@unknown",NULL,NULL, + NULL,NULL,"oRTP-" ORTP_VERSION,"This is free sofware (LGPL) !"); + } + session->telephone_events_pt=-1; /* not defined a priori */ + rtp_session_set_profile (session, &av_profile); /*the default profile to work with */ + session->payload_type=0;/* default to something */ + qinit(&session->rtp.rq); + qinit(&session->rtp.tev_rq); + qinit(&session->contributing_sources); + /* init signal tables */ + rtp_signal_table_init (&session->on_ssrc_changed, session,"ssrc_changed"); + rtp_signal_table_init (&session->on_payload_type_changed, session,"payload_type_changed"); + rtp_signal_table_init (&session->on_telephone_event, session,"telephone-event"); + rtp_signal_table_init (&session->on_telephone_event_packet, session,"telephone-event_packet"); + rtp_signal_table_init (&session->on_timestamp_jump,session,"timestamp_jump"); + rtp_signal_table_init (&session->on_network_error,session,"network_error"); + wait_point_init(&session->send_wp); + wait_point_init(&session->recv_wp); + rtp_session_set_jitter_compensation(session,RTP_DEFAULT_JITTER_TIME); + rtp_session_enable_adaptive_jitter_compensation(session,FALSE); + rtp_session_set_time_jump_limit(session,5000); + session->max_buf_size = UDP_MAX_SIZE; +} + +/** + *rtp_session_new: + *@mode: One of the #RtpSessionMode flags. + * + * Creates a new rtp session. + * If the session is able to send data (RTP_SESSION_SENDONLY or RTP_SESSION_SENDRECV), then a + * random SSRC number is choosed for the outgoing stream. + * + *Returns: the newly created rtp session. +**/ + +RtpSession * +rtp_session_new (gint mode) +{ + RtpSession *session; + session = g_malloc (sizeof (RtpSession)); + rtp_session_init (session, mode); + return session; +} + +/** + *rtp_session_set_scheduling_mode: + *@session: a rtp session. + *@yesno: a boolean to indicate the scheduling mode. + * + * Sets the scheduling mode of the rtp session. If @yesno is TRUE, the rtp session is in + * the scheduled mode, that means that you can use session_set_select() to block until it's time + * to receive or send on this session according to the timestamp passed to the respective functions. + * You can also use blocking mode (see rtp_session_set_blocking_mode() ), to simply block within + * the receive and send functions. + * If @yesno is FALSE, the ortp scheduler will not manage those sessions, meaning that blocking mode + * and the use of session_set_select() for this session are disabled. + * +**/ + +void +rtp_session_set_scheduling_mode (RtpSession * session, gint yesno) +{ + if (yesno) + { + RtpScheduler *sched; + sched = ortp_get_scheduler (); + if (sched != NULL) + { + rtp_session_set_flag (session, RTP_SESSION_SCHEDULED); + session->sched = sched; + rtp_scheduler_add_session (sched, session); + } + else + g_warning + ("rtp_session_set_scheduling_mode: Cannot use scheduled mode because the " + "scheduler is not started. Use ortp_scheduler_init() before."); + } + else + rtp_session_unset_flag (session, RTP_SESSION_SCHEDULED); +} + + +/** + *rtp_session_set_blocking_mode: + *@session: a rtp session + *@yesno: a boolean + * + * Using this function implies that you previously enabled scheduled mode on the session + * (see rtp_session_set_scheduling_mode() ). + * rtp_session_set_blocking_mode() defines the behaviour of the rtp_session_recv_with_ts() and + * rtp_session_send_with_ts() functions. If @yesno is TRUE, rtp_session_recv_with_ts() + * will block until it is time for the packet to be received, according to the timestamp + * passed to the function. After this time, the function returns. + * For rtp_session_send_with_ts(), it will block until it is time for the packet to be sent. + * If @yesno is FALSE, then the two functions will return immediately. + * +**/ +void +rtp_session_set_blocking_mode (RtpSession * session, gint yesno) +{ + if (yesno) + rtp_session_set_flag (session, RTP_SESSION_BLOCKING_MODE); + else + rtp_session_unset_flag (session, RTP_SESSION_BLOCKING_MODE); +} + +/** + *rtp_session_set_profile: + *@session: a rtp session + *@profile: a rtp profile + * + * Set the RTP profile to be used for the session. By default, all session are created by + * rtp_session_new() are initialized with the AV profile, as defined in RFC 1890. The application + * can set any other profile instead using that function. + * + * +**/ + +void +rtp_session_set_profile (RtpSession * session, RtpProfile * profile) +{ + session->profile = profile; + rtp_session_telephone_events_supported(session); +} + + +/** + *rtp_session_signal_connect: + *@session: a rtp session + *@signal: the name of a signal + *@cb: a #RtpCallback + *@user_data: a pointer to any data to be passed when invoking the callback. + * + * This function provides the way for an application to be informed of various events that + * may occur during a rtp session. @signal is a string identifying the event, and @cb is + * a user supplied function in charge of processing it. The application can register + * several callbacks for the same signal, in the limit of #RTP_CALLBACK_TABLE_MAX_ENTRIES. + * Here are name and meaning of supported signals types: + * + * "ssrc_changed" : the SSRC of the incoming stream has changed. + * + * "payload_type_changed" : the payload type of the incoming stream has changed. + * + * "telephone-event_packet" : a telephone-event rtp packet (RFC2833) is received. + * + * "telephone-event" : a telephone event has occured. This is a high-level shortcut for "telephone-event_packet". + * + * "network_error" : a network error happened on a socket. Arguments of the callback functions are + * a const char * explaining the error, an int errno error code and the user_data as usual. + * + * "timestamp_jump" : we have received a packet with timestamp in far future compared to last timestamp received. + * The farness of far future is set by rtp_sesssion_set_time_jump_limit() + * + * Returns: 0 on success, -EOPNOTSUPP if the signal does not exists, -1 if no more callbacks + * can be assigned to the signal type. +**/ +int +rtp_session_signal_connect (RtpSession * session, const char *signal, + RtpCallback cb, gpointer user_data) +{ + OList *elem; + for (elem=session->signal_tables;elem!=NULL;elem=o_list_next(elem)){ + RtpSignalTable *s=(RtpSignalTable*) elem->data; + if (strcmp(signal,s->signal_name)==0){ + return rtp_signal_table_add(s,cb,user_data); + } + } + g_warning ("rtp_session_signal_connect: inexistant signal %s",signal); + return -1; +} + + +/** + *rtp_session_signal_disconnect_by_callback: + *@session: a rtp session + *@signal: a signal name + *@cb: a callback function. + * + * Removes callback function @cb to the list of callbacks for signal @signal. + * + *Returns: 0 on success, -ENOENT if the callbacks was not found. +**/ + +int +rtp_session_signal_disconnect_by_callback (RtpSession * session, const gchar *signal, + RtpCallback cb) +{ + OList *elem; + for (elem=session->signal_tables;elem!=NULL;elem=o_list_next(elem)){ + RtpSignalTable *s=(RtpSignalTable*) elem->data; + if (strcmp(signal,s->signal_name)==0){ + return rtp_signal_table_remove_by_callback(s,cb); + } + } + g_warning ("rtp_session_signal_connect: inexistant signal %s",signal); + return -1; +} + +/** + *rtp_session_set_local_addr: + *@session: a rtp session freshly created. + *@addr: a local IP address in the xxx.xxx.xxx.xxx form. + *@port: a local port. + * + * Specify the local addr to be use to listen for rtp packets or to send rtp packet from. + * In case where the rtp session is send-only, then it is not required to call this function: + * when calling rtp_session_set_remote_addr(), if no local address has been set, then the + * default INADRR_ANY (0.0.0.0) IP address with a random port will be used. Calling + * rtp_sesession_set_local_addr() is mandatory when the session is recv-only or duplex. + * + * Returns: 0 on success. +**/ + +gint +rtp_session_set_local_addr (RtpSession * session, const gchar * addr, gint port) +{ + gint err; + gint optval = 1; +#ifdef INET6 + char num[8]; + struct addrinfo hints, *res0, *res; +#endif + + if (session->rtp.socket>0) { + /* dont try to rebind, close socket before */ + close_socket(session->rtp.socket); + close_socket(session->rtcp.socket); + session->rtp.socket=0; + session->rtcp.socket=0; + } + +#ifdef INET6 + + memset(&hints, 0, sizeof(hints)); + hints.ai_family = PF_UNSPEC; + hints.ai_socktype = SOCK_DGRAM; + snprintf(num, sizeof(num), "%d",port); + err = getaddrinfo(addr,num, &hints, &res0); + if (err!=0) { + g_warning ("Error: %s", gai_strerror(err)); + return err; + } + + for (res = res0; res; res = res->ai_next) { + session->rtp.socket = socket(res->ai_family, res->ai_socktype, 0); + if (session->rtp.socket < 0) + continue; + + err = setsockopt (session->rtp.socket, SOL_SOCKET, SO_REUSEADDR, + (void*)&optval, sizeof (optval)); + if (err < 0) + { + g_warning ("Fail to set rtp address reusable: %s.", getSocketError()); + } + + session->rtp.socktype=res->ai_family; + memcpy(&session->rtp.loc_addr, res->ai_addr, res->ai_addrlen); + err = bind (session->rtp.socket, res->ai_addr, res->ai_addrlen); + if (err != 0) + { + g_warning ("Fail to bind rtp socket to port %i: %s.", port, getSocketError()); + close_socket (session->rtp.socket); + continue; + } +#ifndef __hpux + switch (res->ai_family) + { + case AF_INET: + if (IN_MULTICAST(ntohl(((struct sockaddr_in *) res->ai_addr)->sin_addr.s_addr))) + { + struct ip_mreq mreq; + mreq.imr_multiaddr.s_addr = ((struct sockaddr_in *) res->ai_addr)->sin_addr.s_addr; + mreq.imr_interface.s_addr = INADDR_ANY; + err = setsockopt(session->rtp.socket, IPPROTO_IP, IP_ADD_MEMBERSHIP, &mreq, sizeof(mreq)); + if (err < 0) + { + g_warning ("Fail to join address group: %s.", getSocketError()); + close_socket (session->rtp.socket); + continue; + } + } + break; + case AF_INET6: + if (IN6_IS_ADDR_MULTICAST(&(((struct sockaddr_in6 *) res->ai_addr)->sin6_addr))) + { + struct ipv6_mreq mreq; + mreq.ipv6mr_multiaddr = ((struct sockaddr_in6 *) res->ai_addr)->sin6_addr; + mreq.ipv6mr_interface = 0; + err = setsockopt(session->rtp.socket, IPPROTO_IPV6, IPV6_JOIN_GROUP, &mreq, sizeof(mreq)); + if (err < 0) + { + g_warning ("Fail to join address group: %s.", getSocketError()); + close_socket (session->rtp.socket); + continue; + } + } + break; + } +#endif + break; + } + freeaddrinfo(res0); + if (session->rtp.socket < 0){ + if (session->mode==RTP_SESSION_RECVONLY) g_warning("Could not create rtp socket with address %s: %s",addr,getSocketError()); + return -1; + } + + memset(&hints, 0, sizeof(hints)); + hints.ai_family = PF_UNSPEC; + hints.ai_socktype = SOCK_DGRAM; + snprintf(num, sizeof(num), "%d", (port + 1)); + + err = getaddrinfo(addr, num, &hints, &res0); + if (err!=0) { + g_warning ("Error: %s", gai_strerror(err)); + return err; + } + + for (res = res0; res; res = res->ai_next) { + session->rtcp.socket = socket(res->ai_family, res->ai_socktype, 0); + + if (session->rtcp.socket < 0) + continue; + + err = setsockopt (session->rtcp.socket, SOL_SOCKET, SO_REUSEADDR, + (void*)&optval, sizeof (optval)); + if (err < 0) + { + g_warning ("Fail to set rtcp address reusable: %s.",getSocketError()); + } + session->rtcp.socktype=res->ai_family; + memcpy( &session->rtcp.loc_addr, res->ai_addr, res->ai_addrlen); + err = bind (session->rtcp.socket, res->ai_addr, res->ai_addrlen); + if (err != 0) + { + g_warning ("Fail to bind rtp socket to port %i: %s.", port, getSocketError()); + close_socket (session->rtp.socket); + close_socket (session->rtcp.socket); + continue; + } +#ifndef __hpux + switch (res->ai_family) + { + case AF_INET: + if (IN_MULTICAST(ntohl(((struct sockaddr_in *) res->ai_addr)->sin_addr.s_addr))) + { + struct ip_mreq mreq; + mreq.imr_multiaddr.s_addr = ((struct sockaddr_in *) res->ai_addr)->sin_addr.s_addr; + mreq.imr_interface.s_addr = INADDR_ANY; + err = setsockopt(session->rtcp.socket, IPPROTO_IP, IP_ADD_MEMBERSHIP, &mreq, sizeof(mreq)); + if (err < 0) + { + g_warning ("Fail to join address group: %s.", getSocketError()); + close_socket (session->rtp.socket); + close_socket (session->rtcp.socket); + continue; + } + } + break; + case AF_INET6: + if (IN6_IS_ADDR_MULTICAST(&(((struct sockaddr_in6 *) res->ai_addr)->sin6_addr))) + { + struct ipv6_mreq mreq; + mreq.ipv6mr_multiaddr = ((struct sockaddr_in6 *) res->ai_addr)->sin6_addr; + mreq.ipv6mr_interface = 0; + err = setsockopt(session->rtcp.socket, IPPROTO_IPV6, IPV6_JOIN_GROUP, &mreq, sizeof(mreq)); + if (err < 0) + { + g_warning ("Fail to join address group: %s.", getSocketError()); + close_socket (session->rtp.socket); + close_socket (session->rtcp.socket); + continue; + } + } + break; + } +#endif + + break; + } + freeaddrinfo(res0); + if (session->rtp.socket < 0){ + g_warning("Could not create rtcp socket with address %s: %s",addr,getSocketError()); + return -1; + } +#else + session->rtp.loc_addr.sin_family = AF_INET; + + err = inet_aton (addr, &session->rtp.loc_addr.sin_addr); + + if (err < 0) + { + g_warning ("Error in socket address:%s.", getSocketError()); + return err; + } + session->rtp.loc_addr.sin_port = htons (port); + + session->rtp.socket = socket (PF_INET, SOCK_DGRAM, 0); + g_return_val_if_fail (session->rtp.socket > 0, -1); + + err = setsockopt (session->rtp.socket, SOL_SOCKET, SO_REUSEADDR, + (void*)&optval, sizeof (optval)); + if (err < 0) + { + g_warning ("Fail to set rtp address reusable: %s.",getSocketError()); + } + + err = bind (session->rtp.socket, + (struct sockaddr *) &session->rtp.loc_addr, + sizeof (struct sockaddr_in)); + + if (err != 0) + { + g_warning ("Fail to bind rtp socket to port %i: %s.", port, getSocketError()); + close_socket (session->rtp.socket); + return -1; + } + memcpy (&session->rtcp.loc_addr, &session->rtp.loc_addr, + sizeof (struct sockaddr_in)); + session->rtcp.loc_addr.sin_port = htons (port + 1); + session->rtcp.socket = socket (PF_INET, SOCK_DGRAM, 0); + g_return_val_if_fail (session->rtcp.socket > 0, -1); + + err = setsockopt (session->rtcp.socket, SOL_SOCKET, SO_REUSEADDR, + (void*)&optval, sizeof (optval)); + if (err < 0) + { + g_warning ("Fail to set rtcp address reusable: %s.",getSocketError()); + } + + err = bind (session->rtcp.socket, + (struct sockaddr *) &session->rtcp.loc_addr, + sizeof (struct sockaddr_in)); + if (err != 0) + { + g_warning ("Fail to bind rtcp socket to port %i: %s.", port + 1, getSocketError()); + close_socket (session->rtp.socket); + close_socket (session->rtcp.socket); + return -1; + } +#ifndef __hpux + if (IN_MULTICAST(ntohl(session->rtp.loc_addr.sin_addr.s_addr))) + { + struct ip_mreq mreq; + mreq.imr_multiaddr.s_addr = session->rtp.loc_addr.sin_addr.s_addr; + mreq.imr_interface.s_addr = INADDR_ANY; + err = setsockopt(session->rtp.socket, IPPROTO_IP, IP_ADD_MEMBERSHIP, &mreq, sizeof(mreq)); + if (err == 0) + err = setsockopt(session->rtcp.socket, IPPROTO_IP, IP_ADD_MEMBERSHIP, &mreq, sizeof(mreq)); + if (err < 0) + { + g_warning ("Fail to join address group: %s.", getSocketError()); + close_socket (session->rtp.socket); + close_socket (session->rtcp.socket); + return -1; + } + } +#endif +#endif + /* set RTP socket options */ + set_non_blocking_socket (session->rtp.socket); + /* set RTCP socket options */ + set_non_blocking_socket (session->rtcp.socket); + return 0; +} + + +/** + *rtp_session_set_remote_addr: + *@session: a rtp session freshly created. + *@addr: a local IP address in the xxx.xxx.xxx.xxx form. + *@port: a local port. + * + * Sets the remote address of the rtp session, ie the destination address where rtp packet + * are sent. If the session is recv-only or duplex, it also sets the origin of incoming RTP + * packets. Rtp packets that don't come from addr:port are discarded. + * + * Returns: 0 on success. +**/ + +gint +rtp_session_set_remote_addr (RtpSession * session, const gchar * addr, gint port) +{ + gint err; +#ifdef INET6 + struct addrinfo hints, *res0, *res; + char num[8]; + memset(&hints, 0, sizeof(hints)); + hints.ai_family = PF_UNSPEC; + hints.ai_socktype = SOCK_DGRAM; + snprintf(num, sizeof(num), "%d", port); + err = getaddrinfo(addr, num, &hints, &res0); + if (err) { + g_warning ("Error in socket address: %s", gai_strerror(err)); + return err; + } +#endif + + if (session->rtp.socket == 0) + { + int retry; + /* the session has not its socket bound, do it */ + g_message ("Setting random local addresses."); + for (retry=0;retry<10;retry++) + { + int localport; + do + { + localport = (rand () + 5000) & 0xfffe; + } + while ((localport < 5000) || (localport > 0xffff)); +#ifdef INET6 + /* bind to an address type that matches the destination address */ + if (res0->ai_addr->sa_family==AF_INET6) + err = rtp_session_set_local_addr (session, "::", localport); + else err=rtp_session_set_local_addr (session, "0.0.0.0", localport); +#else + err = rtp_session_set_local_addr (session, "0.0.0.0", localport); +#endif + + if (err == 0) + break; + } + if (retry == 10){ + g_warning("rtp_session_set_remote_addr: Could not find a random local address for socket !"); + return -1; + } + } + + +#ifdef INET6 + err=1; + for (res = res0; res; res = res->ai_next) { + /* set a destination address that has the same type as the local address */ + if (res->ai_family==session->rtp.socktype ) { + memcpy( &session->rtp.rem_addr, res->ai_addr, res->ai_addrlen); + session->rtp.addrlen=res->ai_addrlen; + err=0; + break; + } + } + freeaddrinfo(res0); + if (err) { + g_warning("Could not set destination for RTP socket to %s:%i.",addr,port); + return -1; + } + + memset(&hints, 0, sizeof(hints)); + hints.ai_family = PF_UNSPEC; + hints.ai_socktype = SOCK_DGRAM; + snprintf(num, sizeof(num), "%d", (port + 1)); + err = getaddrinfo(addr, num, &hints, &res0); + if (err) { + g_warning ("Error: %s", gai_strerror(err)); + return err; + } + err=1; + for (res = res0; res; res = res->ai_next) { + /* set a destination address that has the same type as the local address */ + if (res->ai_family==session->rtp.socktype ) { + err=0; + memcpy( &session->rtcp.rem_addr, res->ai_addr, res->ai_addrlen); + session->rtcp.addrlen=res->ai_addrlen; + break; + } + } + freeaddrinfo(res0); + if (err) { + g_warning("Could not set destination for RCTP socket to %s:%i.",addr,port+1); + return -1; + } +#else + session->rtp.addrlen=sizeof(session->rtp.rem_addr); + session->rtp.rem_addr.sin_family = AF_INET; + + err = inet_aton (addr, &session->rtp.rem_addr.sin_addr); + if (err < 0) + { + g_warning ("Error in socket address:%s.", getSocketError()); + return err; + } + session->rtp.rem_addr.sin_port = htons (port); + + memcpy (&session->rtcp.rem_addr, &session->rtp.rem_addr, + sizeof (struct sockaddr_in)); + session->rtcp.rem_addr.sin_port = htons (port + 1); + session->rtcp.addrlen=sizeof(session->rtp.rem_addr); +#endif +#ifndef NOCONNECT + if (session->mode == RTP_SESSION_SENDONLY) + { + err = connect (session->rtp.socket, + (struct sockaddr *) &session->rtp.rem_addr, +#ifdef INET6 + session->rtp.addrlen); +#else + sizeof (struct sockaddr_in)); +#endif + if (err != 0) + { + g_message ("Can't connect rtp socket: %s.",getSocketError()); + return err; + } + err = connect (session->rtcp.socket, + (struct sockaddr *) &session->rtcp.rem_addr, +#ifdef INET6 + session->rtcp.addrlen); +#else + sizeof (struct sockaddr_in)); +#endif + if (err != 0) + { + g_message ("Can't connect rtp socket: %s.",getSocketError()); + return err; + } + } +#endif + return 0; +} + +void rtp_session_set_sockets(RtpSession *session, gint rtpfd, gint rtcpfd) +{ + if (rtpfd>0) set_non_blocking_socket(rtpfd); + if (rtcpfd>0) set_non_blocking_socket(rtcpfd); + session->rtp.socket=rtpfd; + session->rtcp.socket=rtcpfd; + session->flags|=RTP_SESSION_USING_EXT_SOCKETS; +} + +/** + *rtp_session_flush_sockets: + *@session: a rtp session + * + * Flushes the sockets for all pending incoming packets. + * This can be usefull if you did not listen to the stream for a while + * and wishes to start to receive again. During the time no receive is made + * packets get bufferised into the internal kernel socket structure. + * +**/ +void rtp_session_flush_sockets(RtpSession *session){ + char trash[4096]; +#ifdef INET6 + struct sockaddr_storage from; +#else + struct sockaddr from; +#endif + socklen_t fromlen=sizeof(from); + if (session->rtp.socket>0){ + while (recvfrom(session->rtp.socket,(void*)trash,sizeof(trash),0,(struct sockaddr *)&from,&fromlen)>0){}; + } + if (session->rtcp.socket>0){ + while (recvfrom(session->rtcp.socket,(void*)trash,sizeof(trash),0,(struct sockaddr*)&from,&fromlen)>0){}; + } +} + +/** + *rtp_session_set_seq_number: + *@session: a rtp session freshly created. + *@addr: a 16 bit unsigned number. + * + * sets the initial sequence number of a sending session. + * +**/ +void rtp_session_set_seq_number(RtpSession *session, guint16 seq){ + session->rtp.snd_seq=seq; +} + + +guint16 rtp_session_get_seq_number(RtpSession *session){ + return session->rtp.snd_seq; +} + + +#ifdef USE_SENDMSG +#define MAX_IOV 10 +static gint rtp_sendmsg(int sock,mblk_t *m, struct sockaddr *rem_addr, int addr_len){ + int error; + struct msghdr msg; + struct iovec iov[MAX_IOV]; + int iovlen; + for(iovlen=0; iovlenb_cont,iovlen++){ + iov[iovlen].iov_base=m->b_rptr; + iov[iovlen].iov_len=m->b_wptr-m->b_rptr; + } + msg.msg_name=(void*)rem_addr; + msg.msg_namelen=addr_len; + msg.msg_iov=&iov[0]; + msg.msg_iovlen=iovlen; + msg.msg_control=NULL; + msg.msg_controllen=0; + msg.msg_flags=0; + + error=sendmsg(sock,&msg,0); + return error; +} +#endif + +static gint +ortp_rtp_send (RtpSession * session, mblk_t * m) +{ + gint error; + int i; + rtp_header_t *hdr; + + hdr = (rtp_header_t *) m->b_rptr; + /* perform host to network conversions */ + hdr->ssrc = htonl (hdr->ssrc); + hdr->timestamp = htonl (hdr->timestamp); + hdr->seq_number = htons (hdr->seq_number); + for (i = 0; i < hdr->cc; i++) + hdr->csrc[i] = htonl (hdr->csrc[i]); + +#ifdef USE_SENDMSG + if (session->flags & RTP_SESSION_USING_EXT_SOCKETS){ + error=rtp_sendmsg(session->rtp.socket,m,(struct sockaddr *)NULL,0); + }else { + error=rtp_sendmsg(session->rtp.socket,m,(struct sockaddr *) &session->rtp.rem_addr, + session->rtp.addrlen); + } +#else + if (m->b_cont!=NULL){ + mblk_t *newm=msgpullup(m,-1); + freemsg(m); + m=newm; + } + if (session->flags & RTP_SESSION_USING_EXT_SOCKETS){ + error=send(session->rtp.socket, m->b_rptr, (m->b_wptr - m->b_rptr),0); + }else error = sendto (session->rtp.socket, m->b_rptr, + (m->b_wptr - m->b_rptr), 0, + (struct sockaddr *) &session->rtp.rem_addr, + session->rtp.addrlen); +#endif + if (error < 0){ + if (session->on_network_error.count>0){ + rtp_signal_table_emit3(&session->on_network_error,(gpointer)"Error sending RTP packet",INT_TO_POINTER(getSocketErrorCode())); + }else g_warning ("Error sending rtp packet: %s ; socket=%i", getSocketError(), session->rtp.socket); + } + freemsg (m); + return error; +} + +gint +ortp_rtcp_send (RtpSession * session, mblk_t * m) +{ + gint error=0; + gboolean using_ext_socket=(session->flags & RTP_SESSION_USING_EXT_SOCKETS)!=0; + if ( (using_ext_socket && session->rtcp.socket>0 ) || session->rtcp.addrlen>0){ + +#ifndef USE_SENDMSG + if (m->b_cont!=NULL){ + mblk_t *newm=msgpullup(m,-1); + freemsg(m); + m=newm; + } +#endif + if (using_ext_socket && session->rtcp.socket>0 ){ +#ifdef USE_SENDMSG + error=rtp_sendmsg(session->rtcp.socket,m,(struct sockaddr *)NULL,0); +#else + error=send(session->rtcp.socket, m->b_rptr, (m->b_wptr - m->b_rptr),0); +#endif + }else { +#ifdef USE_SENDMSG + error=rtp_sendmsg(session->rtcp.socket,m,(struct sockaddr *) &session->rtcp.rem_addr, + session->rtcp.addrlen); +#else + error = sendto (session->rtcp.socket, m->b_rptr, + (m->b_wptr - m->b_rptr), 0, + (struct sockaddr *) &session->rtcp.rem_addr, + session->rtcp.addrlen); +#endif + } + + if (error < 0){ + if (session->on_network_error.count>0){ + rtp_signal_table_emit3(&session->on_network_error,(gpointer)"Error sending RTCP packet",INT_TO_POINTER(getSocketErrorCode())); + }else g_warning ("Error sending rtcp packet: %s ; socket=%i", getSocketError(), session->rtcp.socket); + } + }else g_warning("Cannot send rtcp report because I don't know the remote address."); + freemsg (m); + return error; +} + + +/** + *rtp_session_set_ssrc: + *@session: a rtp session. + *@ssrc: an unsigned 32bit integer representing the synchronisation source identifier (SSRC). + * + * Sets the SSRC for the outgoing stream. + * If not done, a random ssrc is used. + * +**/ +void +rtp_session_set_ssrc (RtpSession * session, guint32 ssrc) +{ + session->send_ssrc = ssrc; +} + +/* this function initialize all session parameter's that depend on the payload type */ +static void payload_type_changed(RtpSession *session, PayloadType *pt){ + jitter_control_set_payload(&session->rtp.jittctl,pt); + session->rtp.rtcp_report_snt_interval=RTCP_DEFAULT_REPORT_INTERVAL*pt->clock_rate; + rtp_session_set_time_jump_limit(session,session->rtp.time_jump); +} + +/** + *rtp_session_set_payload_type: + *@session: a rtp session + *@paytype: the payload type + * + * Sets the payload type of the rtp session. It decides of the payload types written in the + * of the rtp header for the outgoing stream, if the session is SENDRECV or SENDONLY. + * For the incoming stream, it sets the waited payload type. If that value does not match + * at any time this waited value, then the application can be informed by registering + * for the "payload_type_changed" signal, so that it can make the necessary changes + * on the downstream decoder that deals with the payload of the packets. + * + *Returns: 0 on success, -1 if the payload is not defined. +**/ + +int +rtp_session_set_payload_type (RtpSession * session, int paytype) +{ + PayloadType *pt; + session->payload_type = paytype; + pt=rtp_profile_get_payload(session->profile,paytype); + if (pt!=NULL){ + payload_type_changed(session,pt); + } + return 0; +} + +int rtp_session_get_payload_type(RtpSession *session){ + return session->payload_type; +} + + +/** + *rtp_session_set_payload_type_with_string: + *@session: a rtp session + *@paytype: the payload type + * + * Sets the payload type of the rtp session. It decides of the payload types written in the + * of the rtp header for the outgoing stream, if the session is SENDRECV or SENDONLY. + * Unlike #rtp_session_set_payload_type(), it takes as argument a string referencing the + * payload type (mime type). + * For the incoming stream, it sets the waited payload type. If that value does not match + * at any time this waited value, then the application can be informed by registering + * for the "payload_type_changed" signal, so that it can make the necessary changes + * on the downstream decoder that deals with the payload of the packets. + * + *Returns: 0 on success, -1 if the payload is not defined. +**/ + +int +rtp_session_set_payload_type_with_string (RtpSession * session, const char * mime) +{ + int pt; + pt=rtp_profile_get_payload_number_from_mime(session->profile,mime); + if (pt<0) { + g_warning("%s is not a know mime string within the rtpsession's profile.",mime); + return -1; + } + rtp_session_set_payload_type(session,pt); + return 0; +} + + +/** + *rtp_session_create_packet: + *@session: a rtp session. + *@header_size: the rtp header size. For standart size (without extensions), it is #RTP_FIXED_HEADER_SIZE + *@payload :data to be copied into the rtp packet. + *@payload_size : size of data carried by the rtp packet. + * + * Allocates a new rtp packet. In the header, ssrc and payload_type according to the session's + * context. Timestamp and seq number are not set, there will be set when the packet is going to be + * sent with rtp_session_sendm_with_ts(). + * + *Returns: a rtp packet in a mblk_t (message block) structure. +**/ +mblk_t * rtp_session_create_packet(RtpSession *session,gint header_size, const char *payload, gint payload_size) +{ + mblk_t *mp; + gint msglen=header_size+payload_size; + rtp_header_t *rtp; + + mp=allocb(msglen,BPRI_MED); + rtp=(rtp_header_t*)mp->b_rptr; + rtp->version = 2; + rtp->padbit = 0; + rtp->extbit = 0; + rtp->markbit= 0; + rtp->cc = 0; + rtp->paytype = session->payload_type; + rtp->ssrc = session->send_ssrc; + rtp->timestamp = 0; /* set later, when packet is sended */ + rtp->seq_number = 0; /*set later, when packet is sended */ + /*copy the payload */ + mp->b_wptr+=header_size; + memcpy(mp->b_wptr,payload,payload_size); + mp->b_wptr+=payload_size; + return mp; +} + +/** + *rtp_session_create_packet_with_data: + *@session: a rtp session. + *@payload : the data to be sent with this packet + *@payload_size : size of data + *@freefn : a function that will be called when the payload buffer is no more needed. + * + * Creates a new rtp packet using the given payload buffer (no copy). The header will be allocated separetely. + * In the header, ssrc and payload_type according to the session's + * context. Timestamp and seq number are not set, there will be set when the packet is going to be + * sent with rtp_session_sendm_with_ts(). + * oRTP will send this packet using libc's sendmsg() (if this function is availlable!) so that there will be no + * packet concatenation involving copies to be done in user-space. + * @freefn can be NULL, in that case payload will be kept untouched. + * + *Returns: a rtp packet in a mblk_t (message block) structure. +**/ + +mblk_t * rtp_session_create_packet_with_data(RtpSession *session, char *payload, gint payload_size, void (*freefn)(void*)) +{ + mblk_t *mp,*mpayload; + gint header_size=RTP_FIXED_HEADER_SIZE; /* revisit when support for csrc is done */ + rtp_header_t *rtp; + + mp=allocb(header_size,BPRI_MED); + rtp=(rtp_header_t*)mp->b_rptr; + rtp->version = 2; + rtp->padbit = 0; + rtp->extbit = 0; + rtp->markbit= 0; + rtp->cc = 0; + rtp->paytype = session->payload_type; + rtp->ssrc = session->send_ssrc; + rtp->timestamp = 0; /* set later, when packet is sended */ + rtp->seq_number = 0; /*set later, when packet is sended */ + mp->b_wptr+=header_size; + /* create a mblk_t around the user supplied payload buffer */ + mpayload=allocb_with_buf(payload,payload_size,BPRI_MED,freefn); + mpayload->b_wptr+=payload_size; + /* link it with the header */ + mp->b_cont=mpayload; + return mp; +} + + +/** + *rtp_session_create_packet_in_place: + *@session: a rtp session. + *@buffer: a buffer that contains first just enough place to write a RTP header, then the data to send. + *@size : the size of the buffer + *@freefn : a function that will be called once the buffer is no more needed (the data has been sent). + * + * Creates a new rtp packet using the buffer given in arguments (no copy). + * In the header, ssrc and payload_type according to the session's + * context. Timestamp and seq number are not set, there will be set when the packet is going to be + * sent with rtp_session_sendm_with_ts(). + * @freefn can be NULL, in that case payload will be kept untouched. + * + *Returns: a rtp packet in a mblk_t (message block) structure. +**/ +mblk_t * rtp_session_create_packet_in_place(RtpSession *session,char *buffer, gint size, void (*freefn)(void*) ) +{ + mblk_t *mp; + rtp_header_t *rtp; + + mp=allocb_with_buf(buffer,size,BPRI_MED,freefn); + + rtp=(rtp_header_t*)mp->b_rptr; + rtp->version = 2; + rtp->padbit = 0; + rtp->extbit = 0; + rtp->markbit= 0; + rtp->cc = 0; + rtp->paytype = session->payload_type; + rtp->ssrc = session->send_ssrc; + rtp->timestamp = 0; /* set later, when packet is sended */ + rtp->seq_number = 0; /*set later, when packet is sended */ + return mp; +} + + +/** + *rtp_session_sendm_with_ts: + *@session : a rtp session. + *@mp : a rtp packet presented as a mblk_t. + *@timestamp: the timestamp of the data to be sent. Refer to the rfc to know what it is. + * + * Send the rtp datagram @mp to the destination set by rtp_session_set_remote_addr() + * with timestamp @timestamp. For audio data, the timestamp is the number + * of the first sample resulting of the data transmitted. See rfc1889 for details. + * The packet (@mp) is freed once it is sended. + * + *Returns: the number of bytes sent over the network. +**/ +gint +rtp_session_sendm_with_ts (RtpSession * session, mblk_t *mp, guint32 timestamp) +{ + rtp_header_t *rtp; + guint32 packet_time; + gint error = 0; + gint payloadsize; + RtpScheduler *sched=session->sched; + RtpStream *stream=&session->rtp; + + if (session->flags & RTP_SESSION_SEND_NOT_STARTED) + { + session->rtp.snd_ts_offset = timestamp; + if (session->flags & RTP_SESSION_SCHEDULED) + { + session->rtp.snd_time_offset = sched->time_; + } + rtp_session_unset_flag (session,RTP_SESSION_SEND_NOT_STARTED); + } + /* if we are in blocking mode, then suspend the process until the scheduler it's time to send the + * next packet */ + /* if the timestamp of the packet queued is older than current time, then you we must + * not block */ + if (session->flags & RTP_SESSION_SCHEDULED) + { + packet_time = + rtp_session_ts_to_time (session, + timestamp - + session->rtp.snd_ts_offset) + + session->rtp.snd_time_offset; + /*g_message("rtp_session_send_with_ts: packet_time=%i time=%i",packet_time,sched->time_);*/ + wait_point_lock(&session->send_wp); + if (TIME_IS_STRICTLY_NEWER_THAN (packet_time, sched->time_)) + { + wait_point_wakeup_at(&session->send_wp,packet_time,(session->flags & RTP_SESSION_BLOCKING_MODE)!=0); + session_set_clr(&sched->w_sessions,session); /* the session has written */ + } + else session_set_set(&sched->w_sessions,session); /*to indicate select to return immediately */ + wait_point_unlock(&session->send_wp); + } + + + rtp=(rtp_header_t*)mp->b_rptr; + + payloadsize = msgdsize(mp) - RTP_FIXED_HEADER_SIZE - (rtp->cc*sizeof(guint32)); + rtp_session_lock (session); + + /* set a seq number */ + rtp->seq_number=session->rtp.snd_seq; + rtp->timestamp=timestamp; + session->rtp.snd_seq++; + session->rtp.snd_last_ts = timestamp; + + + ortp_global_stats.sent += payloadsize; + stream->stats.sent += payloadsize; + ortp_global_stats.packet_sent++; + stream->stats.packet_sent++; + + error = ortp_rtp_send (session, mp); + rtp_session_rtcp_process(session); + rtp_session_unlock (session); + + return error; +} + + +/** + *rtp_session_send_with_ts: + *@session: a rtp session. + *@buffer: a buffer containing the data to be sent in a rtp packet. + *@len: the length of the data buffer, in bytes. + *@userts: the timestamp of the data to be sent. Refer to the rfc to know what it is. + * + * Send a rtp datagram to the destination set by rtp_session_set_remote_addr() containing + * the data from @buffer with timestamp @userts. This is a high level function that uses + * rtp_session_create_packet() and rtp_session_sendm_with_ts() to send the data. + * + * + *Returns: the number of bytes sent over the network. +**/ +gint +rtp_session_send_with_ts (RtpSession * session, const gchar * buffer, gint len, + guint32 userts) +{ + mblk_t *m; + int err; +#ifdef USE_SENDMSG + m=rtp_session_create_packet_with_data(session,(gchar*)buffer,len,NULL); +#else + m = rtp_session_create_packet(session,RTP_FIXED_HEADER_SIZE,(gchar*)buffer,len); +#endif + err=rtp_session_sendm_with_ts(session,m,userts); + return err; +} + + +static gint +rtp_recv (RtpSession * session, guint32 user_ts) +{ + gint error; + struct sockaddr remaddr; + socklen_t addrlen = sizeof (remaddr); + char *p; + mblk_t *mp; + RtpStream *stream=&session->rtp; + + if (session->rtp.socket<1) return -1; /*session has no sockets for the moment*/ + + + while (1) + { + if (session->rtp.cached_mp==NULL) + session->rtp.cached_mp = allocb (session->max_buf_size, 0); + mp=session->rtp.cached_mp; + if (session->flags & RTP_SESSION_USING_EXT_SOCKETS){ + error=recv(session->rtp.socket,mp->b_wptr,session->max_buf_size,0); + }else error = recvfrom (session->rtp.socket, mp->b_wptr, + session->max_buf_size, 0, + (struct sockaddr *) &remaddr, + &addrlen); + if (error > 0) + { + if (errorstats.bad++; + ortp_global_stats.bad++; + /* don't free, it will be reused next time */ + }else{ + /* resize the memory allocated to fit the udp message */ + + p = g_realloc (mp->b_wptr, error); + if (p != mp->b_wptr) + ortp_debug("The recv area has moved during reallocation."); + mp->b_datap->db_base = mp->b_rptr = + mp->b_wptr = p; + mp->b_wptr += error; + mp->b_datap->db_lim = mp->b_wptr; + /* then parse the message and put on queue */ + rtp_parse (session, mp, user_ts + session->rtp.hwrcv_diff_ts); + session->rtp.cached_mp=NULL; + } + } + else + { + if (error == 0) + { + g_warning + ("rtp_recv: strange... recv() returned zero."); + } + else if (errno!=EWOULDBLOCK && errno!=EAGAIN) + { + if (session->on_network_error.count>0){ + rtp_signal_table_emit3(&session->on_network_error,(gpointer)"Error receiving RTP packet",INT_TO_POINTER(getSocketErrorCode())); + }else g_warning("Error receiving RTP packet: %s.",getSocketError()); + } + /* don't free the cached_mp, it will be reused next time */ + return -1; /* avoids an infinite loop ! */ + } + } + return error; +} + +extern void rtcp_parse(RtpSession *session, mblk_t *mp); + +static gint +rtcp_recv (RtpSession * session) +{ + gint error; + struct sockaddr remaddr; + socklen_t addrlen=0; + char *p; + mblk_t *mp; + + + if (session->rtcp.socket<1) return -1; /*session has no rtcp sockets for the moment*/ + + + while (1) + { + if (session->rtcp.cached_mp==NULL) + session->rtcp.cached_mp = allocb (RTCP_MAX_RECV_BUFSIZE, 0); + + mp=session->rtcp.cached_mp; + if (session->flags & RTP_SESSION_USING_EXT_SOCKETS){ + error=recv(session->rtcp.socket,mp->b_wptr,RTCP_MAX_RECV_BUFSIZE,0); + }else { + addrlen=sizeof (remaddr); + error=recvfrom (session->rtcp.socket, mp->b_wptr, + RTCP_MAX_RECV_BUFSIZE, 0, + (struct sockaddr *) &remaddr, + &addrlen); + } + if (error > 0) + { + /* resize the memory allocated to fit the udp message */ + + p = g_realloc (mp->b_wptr, error); + if (p != mp->b_wptr) + ortp_debug("The recv area has moved during reallocation."); + mp->b_datap->db_base = mp->b_rptr = + mp->b_wptr = p; + mp->b_wptr += error; + mp->b_datap->db_lim = mp->b_wptr; + /* then parse the message */ + rtcp_parse (session, mp); + freemsg(mp); + session->rtcp.cached_mp=NULL; + if (addrlen>0){ + /* store the sender rtcp address to send him receiver reports */ + memcpy(&session->rtcp.rem_addr,&remaddr,addrlen); + } + } + else + { + if (error == 0) + { + g_warning + ("rtcp_recv: strange... recv() returned zero."); + } + else if (errno!=EWOULDBLOCK && errno!=EAGAIN) + { + if (session->on_network_error.count>0){ + rtp_signal_table_emit3(&session->on_network_error,(gpointer)"Error receiving RTCP packet",INT_TO_POINTER(getSocketErrorCode())); + }else g_warning("Error receiving RTCP packet: %s.",getSocketError()); + } + /* don't free the cached_mp, it will be reused next time */ + return -1; /* avoids an infinite loop ! */ + } + } + return error; +} + + +static void payload_type_changed_incoming(RtpSession *session, int paytype){ + /* check if we support this payload type */ + PayloadType *pt=rtp_profile_get_payload(session->profile,paytype); + if (pt!=0){ + g_message ("rtp_parse: payload type changed to %i(%s) !", + paytype,pt->mime_type); + session->payload_type = paytype; + payload_type_changed(session,pt); + rtp_signal_table_emit (&session->on_payload_type_changed); + }else{ + g_warning("Receiving packet with unknown payload type %i.",paytype); + } +} + + +/** + *rtp_session_recvm_with_ts: + *@session: a rtp session. + *@user_ts: a timestamp. + * + * Try to get a rtp packet presented as a mblk_t structure from the rtp session. + * The @user_ts parameter is relative to the first timestamp of the incoming stream. In other + * words, the application does not have to know the first timestamp of the stream, it can + * simply call for the first time this function with @user_ts=0, and then incrementing it + * as it want. The RtpSession takes care of synchronisation between the stream timestamp + * and the user timestamp given here. + * + *Returns: a rtp packet presented as a mblk_t. +**/ + +mblk_t * +rtp_session_recvm_with_ts (RtpSession * session, guint32 user_ts) +{ + mblk_t *mp = NULL; + rtp_header_t *rtp; + guint32 ts; + guint32 packet_time; + RtpScheduler *sched=session->sched; + RtpStream *stream=&session->rtp; + gint rejected=0; + + /* if we are scheduled, remember the scheduler time at which the application has + * asked for its first timestamp */ + + if (session->flags & RTP_SESSION_RECV_NOT_STARTED) + { + + session->rtp.rcv_query_ts_offset = user_ts; + if (session->flags & RTP_SESSION_SCHEDULED) + { + session->rtp.rcv_time_offset = sched->time_; + //g_message("setting snd_time_offset=%i",session->rtp.snd_time_offset); + } + rtp_session_unset_flag (session,RTP_SESSION_RECV_NOT_STARTED); + } + session->rtp.rcv_last_app_ts = user_ts; + rtp_recv (session, user_ts); + rtcp_recv(session); + /* check for telephone event first */ + /* first lock the session */ + rtp_session_lock (session); + mp=getq(&session->rtp.tev_rq); + if (mp!=NULL){ + rtp_signal_table_emit2(&session->on_telephone_event_packet,(gpointer)mp); + if (session->on_telephone_event.count>0){ + rtp_session_check_telephone_events(session,mp); + } + freemsg(mp); + mp=NULL; + } + + /* then now try to return a media packet, if possible */ + /* first condition: if the session is starting, don't return anything + * until the queue size reaches jitt_comp */ + + if (session->flags & RTP_SESSION_RECV_SYNC) + { + rtp_header_t *oldest, *newest; + queue_t *q = &session->rtp.rq; + if (qempty(q)) + { + ortp_debug ("Queue is empty."); + goto end; + } + oldest = (rtp_header_t *) qfirst(q)->b_rptr; + newest = (rtp_header_t *) qlast(q)->b_rptr; + if ((guint32) (newest->timestamp - oldest->timestamp) < + session->rtp.jittctl.jitt_comp_ts) + { + ortp_debug("Not enough packet bufferised."); + goto end; + } + /* enough packet bufferised */ + mp = getq (&session->rtp.rq); + rtp = (rtp_header_t *) mp->b_rptr; + session->rtp.rcv_ts_offset = rtp->timestamp; + /* remember the timestamp offset between the stream timestamp (random) + * and the user timestamp, that very often starts at zero */ + session->rtp.rcv_diff_ts = rtp->timestamp - user_ts; + /* remember the difference between the last received on the socket timestamp and the user timestamp */ + session->rtp.hwrcv_diff_ts=session->rtp.rcv_diff_ts + session->rtp.jittctl.jitt_comp_ts; + session->rtp.rcv_last_ret_ts = user_ts; /* just to have an init value */ + session->rtp.rcv_last_ts = rtp->timestamp; + session->recv_ssrc = rtp->ssrc; + /* delete the recv synchronisation flag */ + rtp_session_unset_flag (session, RTP_SESSION_RECV_SYNC); + ortp_debug("Returning FIRST packet with ts=%i, hwrcv_diff_ts=%i, rcv_diff_ts=%i", rtp->timestamp, + session->rtp.hwrcv_diff_ts,session->rtp.rcv_diff_ts); + + goto end; + } + /* else this the normal case */ + /*calculate the stream timestamp from the user timestamp */ + ts = user_ts + session->rtp.rcv_diff_ts; + session->rtp.rcv_last_ts = ts; + mp = rtp_getq (&session->rtp.rq, ts,&rejected); + + stream->stats.skipped+=rejected; + ortp_global_stats.skipped+=rejected; + + /* perhaps we can now make some checks to see if a resynchronization is needed */ + /* TODO */ + goto end; + + end: + if (mp != NULL) + { + int msgsize = msgdsize (mp); /* evaluate how much bytes (including header) is received by app */ + guint32 packet_ts; + ortp_global_stats.recv += msgsize; + stream->stats.recv += msgsize; + rtp = (rtp_header_t *) mp->b_rptr; + packet_ts=rtp->timestamp; + ortp_debug("Returning mp with ts=%i", packet_ts); + /* check for payload type changes */ + if (session->payload_type != rtp->paytype) + { + payload_type_changed_incoming(session, rtp->paytype); + } + /* patch the packet so that it has a timestamp compensated by the + adaptive jitter buffer mechanism */ + if (session->rtp.jittctl.adaptive){ + rtp->timestamp-=session->rtp.jittctl.corrective_slide; + /*printf("Returned packet has timestamp %u, with clock slide compensated it is %u\n",packet_ts,rtp->timestamp);*/ + } + } + else + { + ortp_debug ("No mp for timestamp queried"); + stream->stats.unavaillable++; + ortp_global_stats.unavaillable++; + } + rtp_session_rtcp_process(session); + rtp_session_unlock (session); + + if (session->flags & RTP_SESSION_SCHEDULED) + { + /* if we are in blocking mode, then suspend the calling process until timestamp + * wanted expires */ + /* but we must not block the process if the timestamp wanted by the application is older + * than current time */ + packet_time = + rtp_session_ts_to_time (session, + user_ts - + session->rtp.rcv_query_ts_offset) + + session->rtp.rcv_time_offset; + ortp_debug ("rtp_session_recvm_with_ts: packet_time=%i, time=%i",packet_time, sched->time_); + wait_point_lock(&session->recv_wp); + if (TIME_IS_STRICTLY_NEWER_THAN (packet_time, sched->time_)) + { + wait_point_wakeup_at(&session->recv_wp,packet_time, (session->flags & RTP_SESSION_BLOCKING_MODE)!=0); + session_set_clr(&sched->r_sessions,session); + } + else session_set_set(&sched->r_sessions,session); /*to unblock _select() immediately */ + wait_point_unlock(&session->recv_wp); + } + return mp; +} + + +gint msg_to_buf (mblk_t * mp, char *buffer, gint len) +{ + gint rlen = len; + mblk_t *m, *mprev; + gint mlen; + m = mp->b_cont; + mprev = mp; + while (m != NULL) + { + mlen = m->b_wptr - m->b_rptr; + if (mlen <= rlen) + { + mblk_t *consumed = m; + memcpy (buffer, m->b_rptr, mlen); + /* go to next mblk_t */ + mprev->b_cont = m->b_cont; + m = m->b_cont; + consumed->b_cont = NULL; + freeb (consumed); + buffer += mlen; + rlen -= mlen; + } + else + { /*if mlen>rlen */ + memcpy (buffer, m->b_rptr, rlen); + m->b_rptr += rlen; + return len; + } + } + return len - rlen; +} + +/** + *rtp_session_recv_with_ts: + *@session: a rtp session. + *@buffer: a user supplied buffer to write the data. + *@len: the length in bytes of the user supplied buffer. + *@time: the timestamp wanted. + *@have_more: the address of an integer to indicate if more data is availlable for the given timestamp. + * + * Tries to read the bytes of the incoming rtp stream related to timestamp @time. In case + * where the user supplied buffer @buffer is not large enough to get all the data + * related to timestamp @time, then *( @have_more) is set to 1 to indicate that the application + * should recall the function with the same timestamp to get more data. + * + * When the rtp session is scheduled (see rtp_session_set_scheduling_mode() ), and the + * blocking mode is on (see rtp_session_set_blocking_mode() ), then the calling thread + * is suspended until the timestamp given as argument expires, whatever a received packet + * fits the query or not. + * + * Important note: it is clear that the application cannot know the timestamp of the first + * packet of the incoming stream, because it can be random. The @time timestamp given to the + * function is used relatively to first timestamp of the stream. In simple words, 0 is a good + * value to start calling this function. + * + * This function internally calls rtp_session_recvm_with_ts() to get a rtp packet. The content + * of this packet is then copied into the user supplied buffer in an intelligent manner: + * the function takes care of the size of the supplied buffer and the timestamp given in + * argument. Using this function it is possible to read continous audio data (e.g. pcma,pcmu...) + * with for example a standart buffer of size of 160 with timestamp incrementing by 160 while the incoming + * stream has a different packet size. + * + *Returns: if a packet was availlable with the corresponding timestamp supplied in argument + * then the number of bytes written in the user supplied buffer is returned. If no packets + * are availlable, either because the sender has not started to send the stream, or either + * because silence packet are not transmitted, or either because the packet was lost during + * network transport, then the function returns zero. +**/ +gint rtp_session_recv_with_ts (RtpSession * session, gchar * buffer, + gint len, guint32 time, gint * have_more) +{ + mblk_t *mp; + gint rlen = len; + gint wlen, mlen; + guint32 ts_int = 0; /*the length of the data returned in the user supplied buffer, in TIMESTAMP UNIT */ + PayloadType *payload; + RtpStream *stream=&session->rtp; + + *have_more = 0; + + mp = rtp_session_recvm_with_ts (session, time); + payload =rtp_profile_get_payload (session->profile, + session->payload_type); + if (payload==NULL){ + g_warning("rtp_session_recv_with_ts: unable to recv an unsupported payload."); + if (mp!=NULL) freemsg(mp); + return -1; + } + if (!(session->flags & RTP_SESSION_RECV_SYNC)) + { + //ortp_debug("time=%i rcv_last_ret_ts=%i",time,session->rtp.rcv_last_ret_ts); + if (RTP_TIMESTAMP_IS_STRICTLY_NEWER_THAN + (time, session->rtp.rcv_last_ret_ts)) + { + /* the user has missed some data previously, so we are going to give him now. */ + /* we must tell him to call the function once again with the same timestamp + * by setting *have_more=1 */ + *have_more = 1; + } + if (payload->type == PAYLOAD_AUDIO_CONTINUOUS) + { + ts_int = (len * payload->bits_per_sample) >> 3; + session->rtp.rcv_last_ret_ts += ts_int; + //ortp_debug("ts_int=%i",ts_int); + } + else + ts_int = 0; + } + else return 0; + + /* try to fill the user buffer */ + while (1) + { + + if (mp != NULL) + { + mlen = msgdsize (mp->b_cont); + wlen = msg_to_buf (mp, buffer, rlen); + buffer += wlen; + rlen -= wlen; + ortp_debug("mlen=%i wlen=%i rlen=%i", mlen, wlen, + rlen); + /* do we fill all the buffer ? */ + if (rlen > 0) + { + /* we did not fill all the buffer */ + freemsg (mp); + /* if we have continuous audio, try to get other packets to fill the buffer, + * ie continue the loop */ + //ortp_debug("User buffer not filled entirely"); + if (ts_int > 0) + { + time = session->rtp.rcv_last_ret_ts; + ortp_debug("Need more: will ask for %i.", + time); + } + else + return len - rlen; + } + else if (mlen > wlen) + { + int unread = + mlen - wlen + (mp->b_wptr - + mp->b_rptr); + /* not enough space in the user supplied buffer */ + /* we re-enqueue the msg with its updated read pointers for next time */ + ortp_debug ("Re-enqueuing packet."); + rtp_session_lock (session); + rtp_putq (&session->rtp.rq, mp); + rtp_session_unlock (session); + /* quite ugly: I change the stats ... */ + ortp_global_stats.recv -= unread; + stream->stats.recv -= unread; + return len; + } + else + { + /* the entire packet was written to the user buffer */ + freemsg (mp); + return len; + } + } + else + { + /* fill with a zero pattern (silence) */ + if (payload->pattern_length != 0) + { + int i = 0, j = 0; + while (i < rlen) + { + buffer[i] = payload->zero_pattern[j]; + i++; + j++; + if (j <= payload->pattern_length) + j = 0; + } + return len; + } + *have_more = 0; + return 0; + } + mp = rtp_session_recvm_with_ts (session, time); + payload = rtp_profile_get_payload (session->profile, + session->payload_type); + if (payload==NULL){ + g_warning("rtp_session_recv_with_ts: unable to recv an unsupported payload."); + if (mp!=NULL) freemsg(mp); + return -1; + } + } + return -1; +} +/** + *rtp_session_get_current_send_ts: + *@session: a rtp session. + * + * When the rtp session is scheduled and has started to send packets, this function + * computes the timestamp that matches to the present time. Using this function can be + * usefull when sending discontinuous streams. Some time can be elapsed between the end + * of a stream burst and the begin of a new stream burst, and the application may be not + * not aware of this elapsed time. In order to get a valid (current) timestamp to pass to + * #rtp_session_send_with_ts() or #rtp_session_sendm_with_ts(), the application may + * use rtp_session_get_current_send_ts(). + * + *Returns: the current send timestamp for the rtp session. +**/ +guint32 rtp_session_get_current_send_ts(RtpSession *session) +{ + guint32 userts; + guint32 session_time; + RtpScheduler *sched=session->sched; + PayloadType *payload; + g_return_val_if_fail (session->payload_type<128, 0); + payload=rtp_profile_get_payload(session->profile,session->payload_type); + g_return_val_if_fail(payload!=NULL, 0); + if ( (session->flags & RTP_SESSION_SCHEDULED)==0 ){ + g_warning("can't guess current timestamp because session is not scheduled."); + return 0; + } + session_time=sched->time_-session->rtp.snd_time_offset; + userts= (guint32)( ( (gdouble)(session_time) * (gdouble) payload->clock_rate )/ 1000.0) + + session->rtp.snd_ts_offset; + return userts; +} + +/** + *rtp_session_get_current_recv_ts: + *@session: a rtp session. + * + * Same thing as rtp_session_get_current_send_ts() except that it's for an incoming stream. + * Works only on scheduled mode. + * + * Returns: the theoritical that would have to be receive now. + * +**/ +guint32 rtp_session_get_current_recv_ts(RtpSession *session){ + guint32 userts; + guint32 session_time; + RtpScheduler *sched=ortp_get_scheduler(); + PayloadType *payload; + g_return_val_if_fail (session->payload_type<128, 0); + payload=rtp_profile_get_payload(session->profile,session->payload_type); + g_return_val_if_fail(payload!=NULL, 0); + if ( (session->flags & RTP_SESSION_SCHEDULED)==0 ){ + g_warning("can't guess current timestamp because session is not scheduled."); + return 0; + } + session_time=sched->time_-session->rtp.rcv_time_offset; + userts= (guint32)( ( (gdouble)(session_time) * (gdouble) payload->clock_rate )/ 1000.0) + + session->rtp.rcv_ts_offset; + return userts; +} + +/** + *rtp_session_set_time_jump_limit: + *@session: the rtp session + *@ts_step: a time interval in miliseconds + * + * oRTP has the possibility to inform the application through a callback registered + * with rtp_session_signal_connect about crazy incoming RTP stream that jumps from + * a timestamp N to N+. This lets the opportunity for the application + * to reset the session in order to resynchronize, or any other action like stopping the call + * and reporting an error. +**/ +void rtp_session_set_time_jump_limit(RtpSession *session, gint milisecs){ + guint32 ts; + session->rtp.time_jump=milisecs; + ts=rtp_session_time_to_ts(session,milisecs); + if (ts==0) session->rtp.ts_jump=1<<31; /* do not detect ts jump */ + else session->rtp.ts_jump=ts; +} + +void rtp_session_uninit (RtpSession * session) +{ + /* first of all remove the session from the scheduler */ + if (session->flags & RTP_SESSION_SCHEDULED) + { + rtp_scheduler_remove_session (session->sched,session); + } + /*flush all queues */ + flushq (&session->rtp.rq, FLUSHALL); + + /* close sockets */ + close_socket (session->rtp.socket); + close_socket (session->rtcp.socket); + + wait_point_uninit(&session->send_wp); + wait_point_uninit(&session->recv_wp); + g_mutex_free (session->lock); + session->lock=NULL; + if (session->current_tev!=NULL) freemsg(session->current_tev); + if (session->rtp.cached_mp!=NULL) freemsg(session->rtp.cached_mp); + if (session->rtcp.cached_mp!=NULL) freemsg(session->rtcp.cached_mp); + if (session->sd!=NULL) freemsg(session->sd); +} + +/** + *rtp_session_reset: + *@session: a rtp session. + * + * Reset the session: local and remote addresses are kept unchanged but the internal + * queue for ordering and buffering packets is flushed, the session is ready to be + * re-synchronised to another incoming stream. + * +**/ +void rtp_session_reset (RtpSession * session) +{ + + if (session->flags & RTP_SESSION_SCHEDULED) rtp_session_lock (session); + + flushq (&session->rtp.rq, FLUSHALL); + rtp_session_set_flag (session, RTP_SESSION_RECV_SYNC); + rtp_session_set_flag (session, RTP_SESSION_SEND_SYNC); + rtp_session_set_flag (session, RTP_SESSION_RECV_NOT_STARTED); + rtp_session_set_flag (session, RTP_SESSION_SEND_NOT_STARTED); + //session->ssrc=0; + session->rtp.snd_time_offset = 0; + session->rtp.snd_ts_offset = 0; + session->rtp.snd_rand_offset = 0; + session->rtp.snd_last_ts = 0; + session->rtp.rcv_time_offset = 0; + session->rtp.rcv_ts_offset = 0; + session->rtp.rcv_query_ts_offset = 0; + session->rtp.rcv_diff_ts = 0; + session->rtp.rcv_ts = 0; + session->rtp.rcv_last_ts = 0; + session->rtp.rcv_last_app_ts = 0; + session->rtp.hwrcv_extseq.one = 0; + session->rtp.hwrcv_since_last_SR=0; + session->rtp.snd_seq = 0; + rtp_stats_reset(&session->rtp.stats); + jitter_control_init(&session->rtp.jittctl,-1,NULL); + + if (session->flags & RTP_SESSION_SCHEDULED) rtp_session_unlock (session); + +} + +/** + *rtp_session_destroy: + *@session: a rtp session. + * + * Destroys a rtp session. + * +**/ +void rtp_session_destroy (RtpSession * session) +{ + rtp_session_uninit (session); + g_free (session); +} + +guint32 rtp_session_time_to_ts(RtpSession *session, gint time){ + PayloadType *payload; + g_return_val_if_fail (session->payload_type < 127, 0); + payload = + rtp_profile_get_payload (session->profile, + session->payload_type); + if (payload == NULL) + { + g_warning + ("rtp_session_ts_to_t: use of unsupported payload type."); + return 0; + } + /* the return value is in milisecond */ + return (double)payload->clock_rate*(double)time/1000.0; +} + +/* function used by the scheduler only:*/ +guint32 rtp_session_ts_to_time (RtpSession * session, guint32 timestamp) +{ + PayloadType *payload; + g_return_val_if_fail (session->payload_type < 127, 0); + payload = + rtp_profile_get_payload (session->profile, + session->payload_type); + if (payload == NULL) + { + g_warning + ("rtp_session_ts_to_t: use of unsupported payload type."); + return 0; + } + /* the return value is in milisecond */ + return (guint32) (1000.0 * + ((double) timestamp / + (double) payload->clock_rate)); +} + + +/* time is the number of miliseconds elapsed since the start of the scheduler */ +void rtp_session_process (RtpSession * session, guint32 time, RtpScheduler *sched) +{ + wait_point_lock(&session->send_wp); + if (wait_point_check(&session->send_wp,time)){ + session_set_set(&sched->w_sessions,session); + wait_point_wakeup(&session->send_wp); + } + wait_point_unlock(&session->send_wp); + + wait_point_lock(&session->recv_wp); + if (wait_point_check(&session->recv_wp,time)){ + session_set_set(&sched->r_sessions,session); + wait_point_wakeup(&session->recv_wp); + } + wait_point_unlock(&session->recv_wp); +} + + +void rtp_session_make_time_distorsion(RtpSession *session, gint milisec) +{ + session->rtp.snd_time_offset+=milisec; +} + + +/* packet api */ + +void rtp_add_csrc(mblk_t *mp, guint32 csrc) +{ + rtp_header_t *hdr=(rtp_header_t*)mp->b_rptr; + hdr->csrc[hdr->cc]=csrc; + hdr->cc++; +} -- cgit v1.2.1