package five9
import (
"context"
"fmt"
"net/http"
"github.com/equalsgibson/five9-go/five9/five9types"
)
type AgentService struct {
authState *authenticationState
}
func (s AgentService) GetAllMaintenanceNoticesForSelf(ctx context.Context) ([]five9types.MaintenanceNoticeInfo, error) {
var target []five9types.MaintenanceNoticeInfo
request, err := http.NewRequestWithContext(
ctx,
http.MethodGet,
"/appsvcs/rs/svc/agents/:userID/maintenance_notices",
http.NoBody,
)
if err != nil {
return nil, err
}
if err := s.authState.requestWithAuthentication(request, &target); err != nil {
return nil, err
}
return target, nil
}
func (s AgentService) AcceptMaintenanceNoticeForSelf(
ctx context.Context,
maintenanceNoticeID five9types.MaintenanceNoticeID,
) (five9types.MaintenanceNoticeInfo, error) {
var target five9types.MaintenanceNoticeInfo
request, err := http.NewRequestWithContext(
ctx,
http.MethodPut,
fmt.Sprintf(
"/appsvcs/rs/svc/agents/:userID/maintenance_notices/%s/accept",
maintenanceNoticeID,
),
http.NoBody,
)
if err != nil {
return five9types.MaintenanceNoticeInfo{}, err
}
if err := s.authState.requestWithAuthentication(request, &target); err != nil {
return five9types.MaintenanceNoticeInfo{}, err
}
return target, nil
}
func (s *AgentService) GetAllReasonCodes(ctx context.Context) ([]five9types.ReasonCodeInfo, error) {
logoutCodes, err := s.getAllLogoutReasonCodes(ctx)
if err != nil {
return nil, err
}
notReadyCodes, err := s.getAllNotReadyReasonCodes(ctx)
if err != nil {
return nil, err
}
return append(logoutCodes, notReadyCodes...), nil
}
func (s *AgentService) getAllLogoutReasonCodes(ctx context.Context) ([]five9types.ReasonCodeInfo, error) {
var target []five9types.ReasonCodeInfo
request, err := http.NewRequestWithContext(
ctx,
http.MethodGet,
"/appsvcs/rs/svc/orgs/:organizationID/logout_reason_codes",
http.NoBody,
)
if err != nil {
return nil, err
}
if err := s.authState.requestWithAuthentication(request, &target); err != nil {
return nil, err
}
return target, nil
}
func (s *AgentService) getAllNotReadyReasonCodes(ctx context.Context) ([]five9types.ReasonCodeInfo, error) {
var target []five9types.ReasonCodeInfo
request, err := http.NewRequestWithContext(
ctx,
http.MethodGet,
"/appsvcs/rs/svc/orgs/:organizationID/not_ready_reason_codes",
http.NoBody,
)
if err != nil {
return nil, err
}
if err := s.authState.requestWithAuthentication(request, &target); err != nil {
return nil, err
}
return target, nil
}
package five9
import (
"context"
"fmt"
"net/http"
"strings"
"sync"
"time"
"github.com/equalsgibson/five9-go/five9/five9types"
)
type authenticationState struct {
client *client
loginResponse *five9types.LoginResponse
loginMutex *sync.Mutex
apiContextPath string
}
func (a *authenticationState) endpointGetSessionMetadata(ctx context.Context) error {
request, err := http.NewRequestWithContext(
ctx,
http.MethodGet,
fmt.Sprintf("/%s/auth/metadata", a.apiContextPath),
http.NoBody,
)
if err != nil {
return err
}
return a.requestWithAuthentication(request, nil)
}
func (a *authenticationState) requestWithAuthentication(request *http.Request, target any) error {
login, err := a.getLogin(request.Context())
if err != nil {
return err
}
request.URL.Scheme = "https"
request.URL.Host = login.GetAPIHost()
request.URL.Path = strings.ReplaceAll(request.URL.Path, ":userID", string(login.UserID))
request.URL.Path = strings.ReplaceAll(request.URL.Path, ":organizationID", string(login.OrgID))
var latestAttemptErr error
tries := 0
for tries < 3 {
tries++
latestAttemptErr = a.client.request(request, target)
if latestAttemptErr != nil {
if five9Error, ok := latestAttemptErr.(*Error); ok {
if five9Error.StatusCode == http.StatusUnauthorized {
// The login is not registered by other endpoints for a short time.
// I think this has to do with Five9 propagating the session across their data centers.
// We login using the app.five9.com domain but then make subsequent calls to the data center specific domain
time.Sleep(time.Second * 2)
continue
}
// Five9 reply with Status 435 if a service has been migrated. This is not an official status code, so check directly.
if five9Error.StatusCode == int(435) {
// Clear out the login state
a.loginMutex.Lock()
defer a.loginMutex.Unlock()
a.loginResponse = nil
return latestAttemptErr
}
}
}
return nil
}
return latestAttemptErr
}
func (a *authenticationState) getLogin(
ctx context.Context,
) (*five9types.LoginResponse, error) {
{ // check for existing login
if a.loginResponse != nil {
return a.loginResponse, nil
}
a.loginMutex.Lock()
defer a.loginMutex.Unlock()
if a.loginResponse != nil {
return a.loginResponse, nil
}
}
login, err := a.endpointLogin(ctx)
if err != nil {
return nil, err
}
a.loginResponse = &login
if err := a.endpointGetSessionMetadata(ctx); err != nil {
return nil, err
}
loginState, err := a.endpointGetLoginState(ctx)
if err != nil {
return nil, err
}
switch loginState {
case five9types.UserLoginStateSelectStation: // Standard response after logging in.
if err := a.endpointStartSession(ctx); err != nil {
return nil, err
}
// Check the login state after starting the session
newLoginState, err := a.endpointGetLoginState(ctx)
if err != nil {
return nil, err
}
if newLoginState == five9types.UserLoginStateAcceptNotice {
if err := a.handleMaintenanceNotices(ctx); err != nil {
return nil, err
}
}
case five9types.UserLoginStateAcceptNotice: // Can occur if Five9 have issued a maintenance notice
if err := a.handleMaintenanceNotices(ctx); err != nil {
return nil, err
}
case five9types.UserLoginStateRelogin: // Can occur if the service has been migrated
if err := a.endpointRestartSession(ctx); err != nil {
return nil, err
}
// Check the login state after restarting the session
newLoginState, err := a.endpointGetLoginState(ctx)
if err != nil {
return nil, err
}
if newLoginState == five9types.UserLoginStateAcceptNotice {
if err := a.handleMaintenanceNotices(ctx); err != nil {
return nil, err
}
}
}
return a.loginResponse, nil
}
func (a *authenticationState) endpointLogin(ctx context.Context) (five9types.LoginResponse, error) {
payload := five9types.LoginPayload{
PasswordCredentials: a.client.credentials,
AppKey: "web-ui",
Policy: five9types.PolicyForceIn,
}
request, err := http.NewRequestWithContext(
ctx,
http.MethodPost,
fmt.Sprintf("https://app.five9.com/%s/auth/login", a.apiContextPath),
structToReaderCloser(payload),
)
if err != nil {
return five9types.LoginResponse{}, err
}
target := five9types.LoginResponse{}
if err := a.client.request(request, &target); err != nil {
return five9types.LoginResponse{}, err
}
return target, nil
}
func (a *authenticationState) endpointGetLoginState(ctx context.Context) (five9types.UserLoginState, error) {
path := agentAPIPath
if a.apiContextPath == supervisorAPIContextPath {
path = supervisorAPIPath
}
var target five9types.UserLoginState
request, err := http.NewRequestWithContext(
ctx,
http.MethodGet,
fmt.Sprintf(
"/%s/%s/:userID/login_state",
a.apiContextPath,
path,
),
http.NoBody,
)
if err != nil {
return "", err
}
if err := a.requestWithAuthentication(request, &target); err != nil {
return "", err
}
return target, nil
}
func (a *authenticationState) endpointStartSession(ctx context.Context) error {
path := agentAPIPath
if a.apiContextPath == supervisorAPIContextPath {
path = supervisorAPIPath
}
request, err := http.NewRequestWithContext(
ctx,
http.MethodPut,
fmt.Sprintf(
"/%s/%s/:userID/session_start?force=true",
a.apiContextPath,
path,
),
structToReaderCloser(five9types.StationInfo{
StationID: "",
StationType: "EMPTY",
}),
)
if err != nil {
return err
}
if err := a.requestWithAuthentication(request, nil); err != nil {
return err
}
return nil
}
func (a *authenticationState) endpointRestartSession(ctx context.Context) error {
path := agentAPIPath
if a.apiContextPath == supervisorAPIContextPath {
path = supervisorAPIPath
}
request, err := http.NewRequestWithContext(
ctx,
http.MethodPut,
fmt.Sprintf(
"/%s/%s/:userID/session_restart",
a.apiContextPath,
path,
),
http.NoBody,
)
if err != nil {
return err
}
if err := a.requestWithAuthentication(request, nil); err != nil {
return err
}
return nil
}
func (a *authenticationState) handleMaintenanceNotices(ctx context.Context) error {
notices, err := a.endpointGetMaintenanceNotices(ctx)
if err != nil {
return err
}
for _, notice := range notices {
if err := a.endpointAcceptMaintenanceNotice(ctx, notice.ID); err != nil {
return err
}
}
loginState, err := a.endpointGetLoginState(ctx)
if err != nil {
return err
}
if loginState != five9types.UserLoginStateWorking {
if err := a.endpointStartSession(ctx); err != nil {
return err
}
}
return nil
}
func (a *authenticationState) endpointGetMaintenanceNotices(ctx context.Context) ([]five9types.MaintenanceNoticeInfo, error) {
path := agentAPIPath
if a.apiContextPath == supervisorAPIContextPath {
path = supervisorAPIPath
}
request, err := http.NewRequestWithContext(
ctx,
http.MethodGet,
fmt.Sprintf(
"/%s/%s/:userID/maintenance_notices",
a.apiContextPath,
path,
),
http.NoBody,
)
if err != nil {
return nil, err
}
target := []five9types.MaintenanceNoticeInfo{}
if err := a.requestWithAuthentication(request, &target); err != nil {
return nil, err
}
return target, nil
}
func (a *authenticationState) endpointAcceptMaintenanceNotice(
ctx context.Context,
maintenanceNoticeID five9types.MaintenanceNoticeID,
) error {
path := agentAPIPath
if a.apiContextPath == supervisorAPIContextPath {
path = supervisorAPIPath
}
request, err := http.NewRequestWithContext(
ctx,
http.MethodPut,
fmt.Sprintf(
"/%s/%s/:userID/maintenance_notices/%s/accept",
a.apiContextPath,
path,
maintenanceNoticeID,
),
http.NoBody,
)
if err != nil {
return err
}
target := five9types.MaintenanceNoticeInfo{}
return a.requestWithAuthentication(request, &target)
}
package five9
import (
"bytes"
"encoding/json"
"io"
"net/http"
"github.com/equalsgibson/five9-go/five9/five9types"
)
type client struct {
httpClient *http.Client
credentials five9types.PasswordCredentials
requestPreProcessors []func(r *http.Request) error
}
const (
supervisorAPIContextPath = "supsvcs/rs/svc"
agentAPIContextPath = "appsvcs/rs/svc"
agentAPIPath = "agents"
supervisorAPIPath = "supervisors"
)
func (c *client) request(request *http.Request, target any) error {
request.Header.Set("Accept", "application/json")
request.Header.Set("Content-Type", "application/json")
for _, requestPreProcessor := range c.requestPreProcessors {
if err := requestPreProcessor(request); err != nil {
return err
}
}
response, err := c.httpClient.Do(request)
if err != nil {
return err
}
defer response.Body.Close()
if response.StatusCode >= http.StatusBadRequest {
bodyBytes, err := io.ReadAll(response.Body)
if err != nil {
return err
}
responseErr := &Error{
StatusCode: response.StatusCode,
Body: bodyBytes,
}
if err := json.Unmarshal(bodyBytes, responseErr); err != nil {
return err
}
return responseErr
}
if target != nil {
bodyBytes, err := io.ReadAll(response.Body)
if err != nil {
return err
}
if err := json.Unmarshal(bodyBytes, target); err != nil {
return err
}
}
return nil
}
func structToReaderCloser(v any) io.Reader {
vBytes, err := json.Marshal(v)
if err != nil {
panic(err)
}
return bytes.NewReader(vBytes)
}
package five9
import (
"net/http"
)
type ConfigFunc func(*Service)
func AddRequestPreprocessor(things ...func(*http.Request) error) ConfigFunc {
return func(s *Service) {
s.agentService.authState.client.requestPreProcessors = append(
s.agentService.authState.client.requestPreProcessors,
things...,
)
}
}
func SetWebsocketHandler(w webSocketHandler) ConfigFunc {
return func(s *Service) {
s.supervisorService.webSocketHandler = w
}
}
func SetRoundTripper(roundTripper http.RoundTripper) ConfigFunc {
return func(s *Service) {
s.agentService.authState.client.httpClient.Transport = roundTripper
}
}
// func SetMaxCacheAge(maxAge *time.Duration) ConfigFunc {
// return func(s *Service) {
// s.agentService.authState.client.httpClient.Transport = roundTripper
// }
// }
package five9
import (
"encoding/json"
"errors"
"fmt"
)
type five9Error struct {
Five9ExceptionDetail five9ExceptionDetail `json:"five9ExceptionDetail"`
}
type five9ExceptionDetail struct {
Timestamp uint `json:"timestamp"`
ErrorCode int `json:"errorCode"`
Message string `json:"message"`
Context errorContext `json:"context"`
}
type errorContext struct {
ContextCode string `json:"contextCode"`
ObjectID string `json:"objectId"`
}
type Error struct {
StatusCode int `json:"status_code"`
Body []byte `json:"body"`
Message string `json:"message"`
}
func (err *Error) Error() string {
if err.Message == "" {
return fmt.Sprintf("Five9 REST API Error, Status Code: %d", err.StatusCode)
}
return err.Message
}
func (err *Error) UnmarshalJSON(b []byte) error {
target := five9Error{}
if targetErr := json.Unmarshal(b, &target); targetErr == nil {
if target.Five9ExceptionDetail.Message != "" {
err.Message = target.Five9ExceptionDetail.Message
return nil
}
}
return nil
}
var (
ErrUnknownUserID error = errors.New("unknown userID provided")
ErrWebSocketCacheNotReady error = errors.New("webSocket cache is not ready")
ErrWebSocketCacheStale error = errors.New("webSocket cache is stale")
)
package five9
import (
"net/http"
"net/http/cookiejar"
"sync"
"time"
"github.com/equalsgibson/five9-go/five9/five9types"
"github.com/equalsgibson/five9-go/five9/internal/utils"
)
func NewService(
creds five9types.PasswordCredentials,
configFuncs ...ConfigFunc,
) *Service {
cookieJar, _ := cookiejar.New(nil)
httpClient := &http.Client{
Jar: cookieJar,
}
defaultCacheAllowedAge := time.Hour
c := &client{
credentials: creds,
httpClient: httpClient,
requestPreProcessors: []func(r *http.Request) error{},
}
s := &Service{
// ** //
agentService: &AgentService{
authState: &authenticationState{
client: c,
apiContextPath: agentAPIContextPath,
loginMutex: &sync.Mutex{},
},
},
// ** //
supervisorService: &SupervisorService{
authState: &authenticationState{
client: c,
apiContextPath: supervisorAPIContextPath,
loginMutex: &sync.Mutex{},
},
domainMetadataCache: &domainMetadataCache{
agentInfoState: utils.NewMemoryCacheInstance[
five9types.UserID,
five9types.AgentInfo,
](&defaultCacheAllowedAge),
reasonCodeInfoState: utils.NewMemoryCacheInstance[
five9types.ReasonCodeID,
five9types.ReasonCodeInfo,
](&defaultCacheAllowedAge),
queueInfoState: utils.NewMemoryCacheInstance[
five9types.QueueID,
five9types.QueueInfo,
](&defaultCacheAllowedAge),
},
webSocketHandler: &liveWebsocketHandler{},
webSocketCache: &supervisorWebSocketCache{
agentState: utils.NewMemoryCacheInstance[
five9types.UserID,
five9types.AgentState,
](&defaultCacheAllowedAge),
agentStatistics: utils.NewMemoryCacheInstance[
five9types.UserID,
five9types.AgentStatistics,
](&defaultCacheAllowedAge),
acdState: utils.NewMemoryCacheInstance[
five9types.QueueID,
five9types.ACDState,
](&defaultCacheAllowedAge),
timers: utils.NewMemoryCacheInstance[
five9types.EventID,
*time.Time,
](nil),
},
},
}
// Set the cache to default values
s.supervisorService.resetCache()
for _, configFunc := range configFuncs {
configFunc(s)
}
return s
}
type Service struct {
agentService *AgentService
supervisorService *SupervisorService
}
func (s *Service) Supervisor() *SupervisorService {
return s.supervisorService
}
func (s *Service) Agent() *AgentService {
return s.agentService
}
type domainMetadataCache struct {
reasonCodeInfoState *utils.MemoryCacheInstance[five9types.ReasonCodeID, five9types.ReasonCodeInfo]
agentInfoState *utils.MemoryCacheInstance[five9types.UserID, five9types.AgentInfo]
queueInfoState *utils.MemoryCacheInstance[five9types.QueueID, five9types.QueueInfo]
}
package five9
import (
"context"
"net/http"
"github.com/equalsgibson/five9-go/five9/five9types"
)
type SupervisorService struct {
authState *authenticationState
webSocketHandler webSocketHandler
webSocketCache *supervisorWebSocketCache
domainMetadataCache *domainMetadataCache
}
func (s *SupervisorService) GetOwnUserInfo(ctx context.Context) (five9types.AgentInfo, error) {
users, err := s.getDomainUserInfoMap(ctx)
if err != nil {
return five9types.AgentInfo{}, err
}
self, ok := users[s.authState.loginResponse.UserID]
if !ok {
return five9types.AgentInfo{}, ErrUnknownUserID
}
return self, nil
}
func (s *SupervisorService) GetStatisticsFilterSettings(ctx context.Context) ([]five9types.AgentInfo, error) {
var target []five9types.AgentInfo
request, err := http.NewRequestWithContext(
ctx,
http.MethodGet,
"/supsvcs/rs/svc/supervisors/:userID/stats_filter_settings",
http.NoBody,
)
if err != nil {
return nil, err
}
if err := s.authState.requestWithAuthentication(request, &target); err != nil {
return nil, err
}
return target, nil
}
func (s *SupervisorService) SetStatisticsFilterSettings(ctx context.Context, payload any) ([]five9types.AgentInfo, error) {
var target []five9types.AgentInfo
request, err := http.NewRequestWithContext(
ctx,
http.MethodPut,
"/supsvcs/rs/svc/supervisors/:userID/stats_filter_settings",
structToReaderCloser(payload),
)
if err != nil {
return nil, err
}
if err := s.authState.requestWithAuthentication(request, &target); err != nil {
return nil, err
}
return target, nil
}
func (s *SupervisorService) getDomainUserInfoMap(ctx context.Context) (map[five9types.UserID]five9types.AgentInfo, error) {
allUserInfo, err := s.domainMetadataCache.agentInfoState.GetAll()
if err == nil {
return allUserInfo.Items, nil
}
domainUserSlice, err := s.GetAllDomainUsers(ctx)
if err != nil {
return nil, err
}
freshData := map[five9types.UserID]five9types.AgentInfo{}
for _, domainUser := range domainUserSlice {
freshData[domainUser.ID] = domainUser
}
s.domainMetadataCache.agentInfoState.Replace(freshData)
return freshData, nil
}
func (s *SupervisorService) GetAllDomainUsers(ctx context.Context) ([]five9types.AgentInfo, error) {
var target []five9types.AgentInfo
request, err := http.NewRequestWithContext(
ctx,
http.MethodGet,
"/supsvcs/rs/svc/orgs/:organizationID/users",
http.NoBody,
)
if err != nil {
return nil, err
}
if err := s.authState.requestWithAuthentication(request, &target); err != nil {
return nil, err
}
return target, nil
}
func (s *SupervisorService) getQueueInfoMap(ctx context.Context) (map[five9types.QueueID]five9types.QueueInfo, error) {
q, err := s.domainMetadataCache.queueInfoState.GetAll()
if err == nil {
return q.Items, nil
}
queues, err := s.GetAllQueues(ctx)
if err != nil {
return nil, err
}
freshData := map[five9types.QueueID]five9types.QueueInfo{}
for _, queue := range queues {
freshData[queue.ID] = queue
}
s.domainMetadataCache.queueInfoState.Replace(freshData)
return freshData, nil
}
func (s *SupervisorService) GetAllQueues(ctx context.Context) ([]five9types.QueueInfo, error) {
var target []five9types.QueueInfo
request, err := http.NewRequestWithContext(
ctx,
http.MethodGet,
"/supsvcs/rs/svc/orgs/:organizationID/skills",
http.NoBody,
)
if err != nil {
return nil, err
}
if err := s.authState.requestWithAuthentication(request, &target); err != nil {
return nil, err
}
return target, nil
}
func (s *SupervisorService) GetReasonCodeInfoMap(ctx context.Context) (map[five9types.ReasonCodeID]five9types.ReasonCodeInfo, error) {
r, err := s.domainMetadataCache.reasonCodeInfoState.GetAll()
if err == nil {
return r.Items, nil
}
reasonCodes, err := s.GetAllReasonCodes(ctx)
if err != nil {
return nil, err
}
freshData := map[five9types.ReasonCodeID]five9types.ReasonCodeInfo{}
for _, reasonCode := range reasonCodes {
freshData[reasonCode.ID] = reasonCode
}
s.domainMetadataCache.reasonCodeInfoState.Replace(freshData)
return freshData, nil
}
func (s *SupervisorService) GetAllReasonCodes(ctx context.Context) ([]five9types.ReasonCodeInfo, error) {
reasonCodes := []five9types.ReasonCodeInfo{}
logoutCodes, err := s.getAllLogoutReasonCodes(ctx)
if err != nil {
return nil, err
}
reasonCodes = append(reasonCodes, logoutCodes...)
notReadyCodes, err := s.getAllNotReadyReasonCodes(ctx)
if err != nil {
return nil, err
}
return append(reasonCodes, notReadyCodes...), nil
}
func (s *SupervisorService) getAllLogoutReasonCodes(ctx context.Context) ([]five9types.ReasonCodeInfo, error) {
var target []five9types.ReasonCodeInfo
request, err := http.NewRequestWithContext(
ctx,
http.MethodGet,
"/supsvcs/rs/svc/orgs/:organizationID/logout_reason_codes",
http.NoBody,
)
if err != nil {
return nil, err
}
if err := s.authState.requestWithAuthentication(request, &target); err != nil {
return nil, err
}
return target, nil
}
func (s *SupervisorService) getAllNotReadyReasonCodes(ctx context.Context) ([]five9types.ReasonCodeInfo, error) {
var target []five9types.ReasonCodeInfo
request, err := http.NewRequestWithContext(
ctx,
http.MethodGet,
"/supsvcs/rs/svc/orgs/:organizationID/not_ready_reason_codes",
http.NoBody,
)
if err != nil {
return nil, err
}
if err := s.authState.requestWithAuthentication(request, &target); err != nil {
return nil, err
}
return target, nil
}
func (s *SupervisorService) requestWebSocketFullStatistics(ctx context.Context) error {
request, err := http.NewRequestWithContext(
ctx,
http.MethodPut,
"/supsvcs/rs/svc/supervisors/:userID/request_full_statistics",
http.NoBody,
)
if err != nil {
return err
}
if err := s.authState.requestWithAuthentication(request, nil); err != nil {
return err
}
return nil
}
func (s *SupervisorService) UpdateAgentState(ctx context.Context, agentID five9types.UserID) (five9types.UserFullStateInfo, error) {
return five9types.UserFullStateInfo{}, nil
}
package five9
import (
"context"
"errors"
"fmt"
"time"
"github.com/equalsgibson/concur/concur"
"github.com/equalsgibson/five9-go/five9/five9types"
"github.com/equalsgibson/five9-go/five9/internal/utils"
"github.com/google/uuid"
)
type supervisorWebSocketCache struct {
agentState *utils.MemoryCacheInstance[
five9types.UserID,
five9types.AgentState,
]
agentStatistics *utils.MemoryCacheInstance[
five9types.UserID,
five9types.AgentStatistics,
]
acdState *utils.MemoryCacheInstance[
five9types.QueueID,
five9types.ACDState,
]
timers *utils.MemoryCacheInstance[
five9types.EventID,
*time.Time,
]
}
func (s *SupervisorService) StartWebsocket(parentCtx context.Context) error {
// Clear any stale data from a previous connection
s.resetCache()
// If we encounter an error on the WebsocketErr channel, cancel the context, thus cancelling all other goroutines.
ctx, cancel := context.WithCancelCause(parentCtx)
defer func() {
// Clear the cache when closing the connection
s.resetCache()
}()
login, err := s.authState.getLogin(ctx)
if err != nil {
return err
}
connectionURL := fmt.Sprintf("wss://%s/supsvcs/sws/%s", login.GetAPIHost(), uuid.NewString())
if err := s.webSocketHandler.Connect(ctx, connectionURL, s.authState.client.httpClient); err != nil {
return err
}
defer s.webSocketHandler.Close()
asyncReader := concur.NewAsyncReader(s.webSocketHandler.Read)
go asyncReader.Loop(ctx)
defer asyncReader.Close()
pingTicker := time.NewTicker(time.Second * 5)
defer pingTicker.Stop()
go func() {
for {
select {
case <-pingTicker.C:
if err := s.ping(ctx); err != nil {
cancel(err)
return
}
case <-ctx.Done():
return
}
}
}()
pongMonitorTicker := time.NewTicker(time.Second * 5)
defer pongMonitorTicker.Stop()
go func() {
for {
select {
case <-pingTicker.C:
if err := s.pong(ctx); err != nil {
cancel(err)
return
// asyncReader.Close()
}
case <-ctx.Done():
return
}
}
}()
// Get full statistics
go func() {
// When starting a new session, this is called by Five9. Account for rejoining an existing session by also
// calling this.
if err := s.requestWebSocketFullStatistics(ctx); err != nil {
cancel(err)
// asyncReader.Close()
}
}()
for {
select {
case update := <-asyncReader.Updates():
if update.Err != nil {
return update.Err
}
if err := s.handleWebsocketMessage(update.Item); err != nil {
return err
}
case <-ctx.Done():
return context.Cause(ctx)
}
}
}
func (s *SupervisorService) WSAgentState(ctx context.Context) (map[five9types.UserName]five9types.AgentState, error) {
response := map[five9types.UserName]five9types.AgentState{}
domainUsers, err := s.getDomainUserInfoMap(ctx)
if err != nil {
return nil, err
}
all, err := s.webSocketCache.agentState.GetAll()
if err != nil {
if errors.Is(err, utils.ErrWebSocketCacheStale) {
return nil, ErrWebSocketCacheStale
}
if errors.Is(err, utils.ErrWebSocketCacheNotReady) {
return nil, ErrWebSocketCacheNotReady
}
return nil, err
}
for agentID, agentState := range all.Items {
agentInfo, ok := domainUsers[agentID]
if !ok {
continue
}
response[agentInfo.UserName] = agentState
}
return response, nil
}
func (s *SupervisorService) WSAgentStatistics(ctx context.Context) (map[five9types.UserName]five9types.AgentStatistics, error) {
response := map[five9types.UserName]five9types.AgentStatistics{}
domainUsers, err := s.getDomainUserInfoMap(ctx)
if err != nil {
return nil, err
}
allDomainUsers, err := s.webSocketCache.agentStatistics.GetAll()
if err != nil {
if errors.Is(err, utils.ErrWebSocketCacheStale) {
return nil, ErrWebSocketCacheStale
}
if errors.Is(err, utils.ErrWebSocketCacheNotReady) {
return nil, ErrWebSocketCacheNotReady
}
return nil, err
}
for agentID, agentStatistic := range allDomainUsers.Items {
agentInfo, ok := domainUsers[agentID]
if !ok {
continue
}
response[agentInfo.UserName] = agentStatistic
}
return response, nil
}
func (s *SupervisorService) WSACDState(ctx context.Context) (map[string]five9types.ACDState, error) {
response := map[string]five9types.ACDState{}
queues, err := s.getQueueInfoMap(ctx)
if err != nil {
return nil, err
}
allACDState, err := s.webSocketCache.acdState.GetAll()
if err != nil {
if errors.Is(err, utils.ErrWebSocketCacheStale) {
return nil, ErrWebSocketCacheStale
}
if errors.Is(err, utils.ErrWebSocketCacheNotReady) {
return nil, ErrWebSocketCacheNotReady
}
return nil, err
}
for queueID, queueState := range allACDState.Items {
queueInfo, ok := queues[queueID]
if !ok {
continue
}
response[queueInfo.Name] = queueState
}
return response, nil
}
func (s *SupervisorService) ping(ctx context.Context) error {
if err := s.webSocketHandler.Write(ctx, []byte("ping")); err != nil {
return err
}
return nil
}
func (s *SupervisorService) pong(_ context.Context) error {
lastPongReceived, ok := s.webSocketCache.timers.Get(five9types.EventIDPongReceived)
if !ok {
return errors.New("could not obtain last pong time from cache")
}
if time.Since(*lastPongReceived) > time.Second*45 {
return errors.New("last valid ping response from WS is older than 45 seconds, closing connection")
}
return nil
}
func (s *SupervisorService) resetCache() {
s.authState.loginMutex.Lock()
s.authState.loginResponse = nil
s.authState.loginMutex.Unlock()
s.webSocketCache.acdState.Reset()
s.webSocketCache.agentState.Reset()
s.webSocketCache.agentStatistics.Reset()
s.webSocketCache.timers.Reset()
s.domainMetadataCache.agentInfoState.Reset()
s.domainMetadataCache.queueInfoState.Reset()
s.domainMetadataCache.reasonCodeInfoState.Reset()
serviceReset := time.Now()
s.webSocketCache.timers.Update(five9types.EventIDPongReceived, &serviceReset)
}
package five9
import (
"encoding/json"
"errors"
"fmt"
"time"
"github.com/equalsgibson/five9-go/five9/five9types"
)
type websocketFrameProcessingError struct {
OriginalError error
MessageBytes []byte
}
func (err websocketFrameProcessingError) Error() string {
return fmt.Sprintf("Error while processing websocket frame: %s - %s", err.OriginalError.Error(), string(err.MessageBytes))
}
func (s *SupervisorService) handleWebsocketMessage(messageBytes []byte) error {
message := five9types.WebsocketMessage{}
if err := json.Unmarshal(messageBytes, &message); err != nil {
return websocketFrameProcessingError{
OriginalError: err,
MessageBytes: messageBytes,
}
}
if message.Context.EventID == "" {
return websocketFrameProcessingError{
OriginalError: errors.New("unsupported message"),
MessageBytes: messageBytes,
}
}
eventReceivedTime := time.Now()
s.webSocketCache.timers.Update(message.Context.EventID, &eventReceivedTime)
switch message.Context.EventID {
case five9types.EventIDServerConnected:
return nil
case five9types.EventIDPongReceived:
return s.handlerPong(message.Payload)
case five9types.EventIDIncrementalStatsUpdate:
return s.handlerIncrementalStatsUpdate(message.Payload)
case five9types.EventIDSupervisorStats:
return s.handlerSupervisorStats(message.Payload)
}
return nil
}
func (s *SupervisorService) handlerPong(payload any) error {
payloadString, ok := payload.(string)
if !ok {
return fmt.Errorf("failed type assertion for payload: %T", payload)
}
if payloadString != "pong" {
return fmt.Errorf("payload not expected type")
}
return nil
}
func (s *SupervisorService) handlerIncrementalStatsUpdate(payload any) error {
payloadSlice, ok := payload.([]any)
if !ok {
return fmt.Errorf("failed type assertion for payload: %T", payload)
}
for _, payloadItem := range payloadSlice {
payloadMap, ok := payloadItem.(map[string]any)
if !ok {
return fmt.Errorf("failed type assertion for payload item: %T", payloadItem)
}
dataSourceRaw, ok := payloadMap["dataSource"]
if !ok {
return fmt.Errorf("data source not found: %+v", payloadItem)
}
dataSourceString, ok := dataSourceRaw.(string)
if !ok {
return fmt.Errorf("data source is not a string, %T", dataSourceRaw)
}
dataSource := five9types.DataSource(dataSourceString)
payloadItemBytes, err := json.Marshal(payloadItem)
if err != nil {
return err
}
switch dataSource {
// ** //
case five9types.DataSourceAgentState:
eventTarget := five9types.WebSocketIncrementalAgentStateData{}
if err := json.Unmarshal(payloadItemBytes, &eventTarget); err != nil {
return websocketFrameProcessingError{
OriginalError: err,
MessageBytes: payloadItemBytes,
}
}
if err := s.handleAgentStateUpdate(eventTarget); err != nil {
return err
}
// ** //
case five9types.DataSourceACDStatus:
eventTarget := five9types.WebSocketIncrementalACDStateData{}
if err := json.Unmarshal(payloadItemBytes, &eventTarget); err != nil {
return websocketFrameProcessingError{
OriginalError: err,
MessageBytes: payloadItemBytes,
}
}
if err := s.handleACDStateUpdate(eventTarget); err != nil {
return err
}
}
}
return nil
}
func (s *SupervisorService) handlerSupervisorStats(payload any) error {
payloadSlice, ok := payload.([]any)
if !ok {
return fmt.Errorf("failed type assertion for payload: %T", payload)
}
for _, payloadItem := range payloadSlice {
payloadMap, ok := payloadItem.(map[string]any)
if !ok {
return fmt.Errorf("failed type assertion for payload item: %T", payloadItem)
}
dataSourceRaw, ok := payloadMap["dataSource"]
if !ok {
return fmt.Errorf("data source not found: %+v", payloadItem)
}
dataSourceString, ok := dataSourceRaw.(string)
if !ok {
return fmt.Errorf("data source is not a string, %T", dataSourceRaw)
}
dataSource := five9types.DataSource(dataSourceString)
payloadItemBytes, err := json.Marshal(payloadItem)
if err != nil {
return err
}
switch dataSource {
// ** //
case five9types.DataSourceAgentState:
eventTarget := five9types.WebsocketSupervisorStateData{}
if err := json.Unmarshal(payloadItemBytes, &eventTarget); err != nil {
return websocketFrameProcessingError{
OriginalError: err,
MessageBytes: payloadItemBytes,
}
}
freshData := map[five9types.UserID]five9types.AgentState{}
for _, agent := range eventTarget.Data {
freshData[agent.ID] = agent
}
s.webSocketCache.agentState.Replace(freshData)
// ** //
case five9types.DataSourceAgentStatistic:
eventTarget := five9types.WebsocketSupervisorStatisticsData{}
if err := json.Unmarshal(payloadItemBytes, &eventTarget); err != nil {
return websocketFrameProcessingError{
OriginalError: err,
MessageBytes: payloadItemBytes,
}
}
freshData := map[five9types.UserID]five9types.AgentStatistics{}
for _, agentStatistic := range eventTarget.Data {
freshData[agentStatistic.ID] = agentStatistic
}
s.webSocketCache.agentStatistics.Replace(freshData)
// ** //
case five9types.DataSourceACDStatus:
eventTarget := five9types.WebsocketSupervisorACDData{}
if err := json.Unmarshal(payloadItemBytes, &eventTarget); err != nil {
return websocketFrameProcessingError{
OriginalError: err,
MessageBytes: payloadItemBytes,
}
}
freshData := map[five9types.QueueID]five9types.ACDState{}
for _, acd := range eventTarget.Data {
freshData[acd.ID] = acd
}
s.webSocketCache.acdState.Replace(freshData)
}
}
return nil
}
func (s *SupervisorService) handleAgentStateUpdate(eventData five9types.WebSocketIncrementalAgentStateData) error {
for _, addedData := range eventData.Added {
s.webSocketCache.agentState.Update(addedData.ID, addedData)
}
for _, updatedData := range eventData.Updated {
s.webSocketCache.agentState.Update(updatedData.ID, updatedData)
}
for _, removedID := range eventData.Removed {
s.webSocketCache.agentState.Delete(removedID)
}
return nil
}
func (s *SupervisorService) handleACDStateUpdate(eventData five9types.WebSocketIncrementalACDStateData) error {
for _, addedData := range eventData.Added {
s.webSocketCache.acdState.Update(addedData.ID, addedData)
}
for _, updatedData := range eventData.Updated {
s.webSocketCache.acdState.Update(updatedData.ID, updatedData)
}
for _, removedID := range eventData.Removed {
s.webSocketCache.acdState.Delete(removedID)
}
return nil
}
package five9
import (
"context"
"errors"
"math"
"net/http"
"nhooyr.io/websocket"
)
type webSocketHandler interface {
Connect(ctx context.Context, s string, c *http.Client) error
Read(ctx context.Context) ([]byte, error)
Write(ctx context.Context, b []byte) error
Close()
}
type liveWebsocketHandler struct {
c *websocket.Conn
}
func (h *liveWebsocketHandler) Connect(ctx context.Context, connectionURL string, httpClient *http.Client) error {
if h.c != nil {
h.c.Close(websocket.StatusNormalClosure, "Restarting connection")
}
h.c = nil
connection, response, err := websocket.Dial(ctx, connectionURL, &websocket.DialOptions{
HTTPClient: httpClient,
})
if err != nil {
return err
}
defer func() {
if response != nil && response.Body != nil {
response.Body.Close()
}
}()
// Set a very large read limit
connection.SetReadLimit(math.MaxInt32)
h.c = connection
return nil
}
func (h *liveWebsocketHandler) Read(ctx context.Context) ([]byte, error) {
messageType, b, err := h.c.Read(ctx)
if err != nil {
return nil, err
}
if messageType != websocket.MessageText {
return nil, errors.New("binary messages are not supported")
}
return b, nil
}
func (h *liveWebsocketHandler) Close() {
if h.c != nil {
h.c.Close(websocket.StatusNormalClosure, "closed")
}
}
func (h *liveWebsocketHandler) Write(ctx context.Context, data []byte) error {
return h.c.Write(ctx, websocket.MessageText, data)
}